2017-06-26 35 views
0

我正在使用Confluent.Kafka dotnet客戶端。卡夫卡消費者提交線程安全

namespace Confluent.Kafka 
{ 
    public class Consumer<TKey, TValue> : IDisposable 
    { 
     public Task<CommittedOffsets> CommitAsync(); 
    } 
} 

正如您所見,Consumer.CommitAsync是一種異步方法。在不等待回覆的情況下撥打CommitAsync方法是否安全,然後再撥打Subscribe

下面的示例代碼。

using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer())) 
{ 
       consumer.Subscribe(topics); 

       while (true) 
       { 
        Message<MessageKey, byte[]> msg; 
        if (consumer.Consume(out msg, TimeSpan.FromSeconds(1))) 
        { 
         // ... 

         if(msg.Offset % 100 == 0) 
         { 
          consumer.CommitAsync().ContinueWith((t) => 
          { 
           // log t.Exception 
          }, TaskContinuationOptions.OnlyOnFaulted); 
         } 
        } 
       } 
} 

回答

0

我想你想說的話下次調用消耗

是的,它是安全的,這個沒有問題。 我還會添加一些提交時間窗口(例如5秒和100msgs之間的第一個例子),這樣如果您在一段時間內沒有收到消息,仍然會提交它們