2015-06-29 46 views
2

我目前正在使用EventProcessorHost和一個簡單的IEventProcessor實現來實現Event Hub讀取器。我已經確認使用Paolo Salvatori出色的Service Bus Explorer將遙測數據寫入事件中心。我已成功配置EventProcessorHost以使用存儲帳戶租用和檢查點。我可以在存儲帳戶中看到Event Hub數據文件。我現在看到的問題是,IEventProcessor實現似乎沒有從Event Hub中讀取任何內容。IEventProcessor未從事件中心讀取

我沒有收到任何例外。測試控制檯應用程序正在連接到存儲帳戶,沒有問題。我注意到我添加到構造函數的日誌語句永遠不會被調用,所以它看起來像接收器從來沒有真正被創建。我覺得我缺少一些簡單的東西。任何人都可以幫助我確定我錯過了什麼嗎?謝謝!

IEventProcessor實現:

namespace Receiver 
{ 
    internal class SimpleEventProcessor : IEventProcessor 
    { 
     private Stopwatch _checkPointStopwatch; 

     public SimpleEventProcessor() 
     { 
      Console.WriteLine("SimpleEventProcessor created"); 
     } 

     #region Implementation of IEventProcessor 

     public Task OpenAsync(PartitionContext context) 
     { 
      Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}", 
       context.Lease.PartitionId, context.Lease.Offset); 
      _checkPointStopwatch = new Stopwatch(); 
      _checkPointStopwatch.Start(); 
      return Task.FromResult<object>(null); 
     } 

     public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
     { 
      foreach (var data in messages.Select(eventData => Encoding.UTF8.GetString(eventData.GetBytes()))) 
      { 
       Console.WriteLine("Message received. Partition: '{0}', Data: '{1}'", context.Lease.PartitionId, 
        data); 
      } 

      if (_checkPointStopwatch.Elapsed > TimeSpan.FromSeconds(30)) 
      { 
       await context.CheckpointAsync(); 
       _checkPointStopwatch.Restart(); 
      } 
     } 

     public async Task CloseAsync(PartitionContext context, CloseReason reason) 
     { 
      Console.WriteLine("Processor shutting down. Partition '{0}', Reason: {1}", context.Lease.PartitionId, 
       reason); 

      if (reason == CloseReason.Shutdown) 
      { 
       await context.CheckpointAsync(); 
      } 
     } 

     #endregion 
    } 
} 

測試控制檯代碼:

namespace EventHubTestConsole 
{ 
internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     AsyncPump.Run((Func<Task>) MainAsync); 
    } 

    private static async Task MainAsync() 
    { 
     const string eventHubConnectionString = 
      "Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>"; 
     const string eventHubName = "<event hub name>"; 
     const string storageAccountName = "<storage account name>"; 
     const string storageAccountKey = "<valid storage key>"; 
     var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", 
      storageAccountName, storageAccountKey); 
     Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString); 

     var eventProcessorHostName = Guid.NewGuid().ToString(); 
     var eventProcessorHost = new EventProcessorHost(
      eventProcessorHostName, 
      eventHubName, 
      EventHubConsumerGroup.DefaultGroupName, 
      eventHubConnectionString, 
      storageConnectionString); 

     var epo = new EventProcessorOptions 
     { 
      MaxBatchSize = 100, 
      PrefetchCount = 1, 
      ReceiveTimeOut = TimeSpan.FromSeconds(20), 
      InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7) 
     }; 

     epo.ExceptionReceived += OnExceptionReceived; 

     await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo); 

     Console.WriteLine("Receiving. Please enter to stop worker."); 
     Console.ReadLine(); 
    } 

    public static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs args) 
    { 
     Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message); 
    } 
} 
+1

嗯,我不知道這是在目前與Azure的一個問題。我的應用程序都無法與Azure事件中心對話。 –

+0

我仍然可以通過服務總線瀏覽器進行連接。我已經複習了相關的源代碼,連接起來似乎比簡單的教程提供的要複雜得多。因爲我只是要使用直接消費者並重新訪問EventProcessHost層。感謝您的評論。 – MichaelMilom

回答

0

它看起來像問題是與EventProcessorOptions.PrefetchCount你的價值。

我改變了你的代碼,如下圖所示(刪除AsyncPump並乾淨地關閉接收器)。我發現,RegisterEventProcessorAsync拋出一個異常,如果PrefetchCount小於10

namespace EventHubTestConsole 
{ 
    internal class Program 
    { 
    private static void Main(string[] args) 
    { 
     const string eventHubConnectionString = 
     "Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>"; 
     const string eventHubName = "<event hub name>"; 
     const string storageAccountName = "<storage account name>"; 
     const string storageAccountKey = "<valid storage key>"; 
     var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", 
     storageAccountName, storageAccountKey); 
     Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString); 

     var eventProcessorHostName = Guid.NewGuid().ToString(); 
     var eventProcessorHost = new EventProcessorHost(
     eventProcessorHostName, 
     eventHubName, 
     EventHubConsumerGroup.DefaultGroupName, 
     eventHubConnectionString, 
     storageConnectionString); 

     var epo = new EventProcessorOptions 
     { 
      MaxBatchSize = 100, 
      PrefetchCount = 10, 
      ReceiveTimeOut = TimeSpan.FromSeconds(20), 
      InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7) 
     }; 

     epo.ExceptionReceived += OnExceptionReceived; 

     eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo).Wait(); 

     Console.WriteLine("Receiving. Please enter to stop worker."); 
     Console.ReadLine(); 
     eventProcessorHost.UnregisterEventProcessorAsync().Wait(); 
    } 


    public static void OnExceptionReceived(object sender,  ExceptionReceivedEventArgs args) 
    { 
     Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message); 
    } 
    } 
}