我對使用EventProcessorHost和IEventProcessor非常新,我試圖弄清楚如何從EventProcessorClass中獲取數據。如果我只是想將新消息記錄到控制檯,我現在已經完成了所有工作。從EventProcessorHost獲取數據
我目前的實現(我甚至不確定它是否可以接受,甚至是不錯的做法)創建一個靜態變量,然後將數據存儲在它中,以便其他處理器可以收集它。這是可以做的還是有更好的更清潔的方式來訪問數據?
這是我迄今爲止(鎖定機制是非常基本的,當我得到的代碼的其餘工作將是固定的):
internal class Receiver
{
public static List<string> incommingMessagesList = new List<string>();
public static bool fIsDataListLocked = false;
private EventProcessorHost m_EPHClient;
...
Console.WriteLine("Registering EventProcessor...");
await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
}
public class SimpleEventProcessor : IEventProcessor
{
...
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach(var eventData in messages)
{
while(!Receiver.fIsDataListLocked)
{
Receiver.fIsDataListLocked = true ;
Receiver.incommingMessagesList.Add(Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count));
Receiver.fIsDataListLocked = false ;
}
}
return context.CheckpointAsync();
}
}
更新:
根據要求一點更多信息:
基本上我從一個流水線的兩個不同端拉取數據以驗證所有消息通過並跟蹤其吞吐量,一端是eventhub,另一端是來自lwm2m服務器的HTTP請求。所以我有一個控制器進程運行,需要從兩端獲取數據以清理/分析數據。就像我說的,我是新來的事件處理器,但它沒有任何意義,讓我EventProcessorHost處理收集兩套數據,然後清理/分析它。我絕對可以改變做法,但看起來很笨重。
您想在生產中存儲數據的位置?數據庫?創建另一種處理器來處理incomingMessagesList中的數據有什麼意義?這是沒有意義的。 EventProcessor應該處理數據。 (對於線程安全集合,請參閱https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/) –
您需要詳細說明您的最終目標是什麼。通常,處理器是無狀態的 - 它們將數據加載並保存到某個外部持久存儲區。 – Mikhail