2017-06-16 29 views
1

我對使用EventProcessorHost和IEventProcessor非常新,我試圖弄清楚如何從EventProcessorClass中獲取數據。如果我只是想將新消息記錄到控制檯,我現在已經完成了所有工作。從EventProcessorHost獲取數據

我目前的實現(我甚至不確定它是否可以接受,甚至是不錯的做法)創建一個靜態變量,然後將數據存儲在它中,以便其他處理器可以收集它。這是可以做的還是有更好的更清潔的方式來訪問數據?

這是我迄今爲止(鎖定機制是非常基本的,當我得到的代碼的其餘工作將是固定的):

internal class Receiver 
{ 
    public static List<string> incommingMessagesList = new List<string>(); 
    public static bool fIsDataListLocked = false; 

    private EventProcessorHost m_EPHClient; 

    ... 

    Console.WriteLine("Registering EventProcessor..."); 
    await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>(); 
} 
public class SimpleEventProcessor : IEventProcessor 
{ 
    ... 

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     foreach(var eventData in messages) 
     { 
      while(!Receiver.fIsDataListLocked) 
      { 
       Receiver.fIsDataListLocked = true ; 
       Receiver.incommingMessagesList.Add(Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count)); 
       Receiver.fIsDataListLocked = false ; 
      } 

     } 
     return context.CheckpointAsync(); 
    } 
} 

更新:

根據要求一點更多信息:

基本上我從一個流水線的兩個不同端拉取數據以驗證所有消息通過並跟蹤其吞吐量,一端是eventhub,另一端是來自lwm2m服務器的HTTP請求。所以我有一個控制器進程運行,需要從兩端獲取數據以清理/分析數據。就像我說的,我是新來的事件處理器,但它沒有任何意義,讓我EventProcessorHost處理收集兩套數據,然後清理/分析它。我絕對可以改變做法,但看起來很笨重。

+0

您想在生產中存儲數據的位置?數據庫?創建另一種處理器來處理incomingMessagesList中的數據有什麼意義?這是沒有意義的。 EventProcessor應該處理數據。 (對於線程安全集合,請參閱https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/) –

+0

您需要詳細說明您的最終目標是什麼。通常,處理器是無狀態的 - 它們將數據加載並保存到某個外部持久存儲區。 – Mikhail

回答

1

在典型的場景中,事件處理器以儘可能最快的方式接收和保存數據。多個事件處理器實例將從不同的EventHub分區讀取數據。

對於您的情況,您希望將數據發送到其他位置,並在那裏結合另一個數據流進行處理。一個內存中的集合像一個列表可能不是這樣做的最佳方式:

  • 它必須是線程安全的
  • 在崩潰數據將丟失
  • 您需要手動刪除處理數據以防止不斷增長的收集

您將需要某種生產者/消費者實現。

一個可能的解決方案是將兩個數據流寫入單個目標,如Azure存儲隊列。這有一個主要優點,即在出現故障時,所有數據仍然保留並不會丟失。您的最終處理器可以從隊列中以自己的速度讀取。

+0

謝謝彼得,這對我很有意義。我原來的計劃實際上是一個內存解決方案,它管理着「* list *」,因爲數據不需要保存,它基本上被取出處理,然後在數據分析時被轉儲。儘管如此,我會考慮使用sb隊列來簡單起見。 – Tommy