2017-07-31 28 views
0

匯合的高級消費者here具有以下代碼(爲了簡潔起見而修剪)。Confluent .net(rdkafka)提交消費者處置

 using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8))) 
     { 
      while (!cancelled) 
      { 
       Message<Null, string> msg; 
       if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100))) 
       { 
        continue; 
       }      

       if (msg.Offset % 5 == 0) 
       {       
        consumer.CommitAsync(msg).Result;       
       } 
      } 
     } 

自動提交是錯誤的。 我的問題是,如果'已取消'觸發器被標記,但還有未完成的提交時會發生什麼。這些消息是否未提交,因此會再次收到?我希望消費者會承諾處置,但在實施過程中我看不到這種情況。我可以做一些測試來看看會發生什麼,但是我希望得到一個'官方'的答案,以防我的測試不能涵蓋所有情況。

+0

與實際問題無關,但:它是否是正確的實施?沒有內存隔閡,確保'取消= true;'是否被其他線程觀察過? – zerkms

+0

讓我們說'取消'是揮發性的 – acarlon

回答

1

首先請注意,只有在enable.auto.commit設置爲false(這是Advanced Consumer示例中的情況 - 這需要更多評論和wiki輸入)時,才應該使用CommitAsync。如果使用的是自動提交(默認),廢棄時,您將自動提交(這是當調用rd_kafka_destroy librdkafka側)

在這裏,我們正在等待CommitAsync完成(通過.Result),這就是所謂的與while循環相同的線程,所以不會有「在飛行中」提交請求。

如果您使用手動提交(在完整的代碼片段中就是這種情況),您必須在處理之前手動提交 - 實例中確實缺少,將添加它。如上所述,在手動提交中,您必須自己承諾(有些情況下用戶在處置時不會提交)

我還發現了一個錯誤,如果您調用CommitAsync但不要等到它發生之前(https://github.com/confluentinc/confluent-kafka-dotnet/issues/279)因此,您應該始終等待CommitAsync在臨時解除之前完成

+0

+ 1的處置提示有關commitAsync。但我的問題依然存在 - 未完成的承諾是否會被處置?從我的測試到目前爲止,我認爲答案是否定的。 – acarlon

+0

當enable.auto.commit爲True時,consumer會對dispose進行提交,但在我的情況下,它仍然卡住。最終(70分鐘後)超時:Confluent.Kafka.KafkaException:本地:超時 at Confluent.Kafka.Impl.SafeKafkaHandle.ConsumerClose() –

+0

如果使用自動提交,則會執行。如果指定手動提交,則必須在處理之前調用並等待CommitAsync完成。如果你不等待它,行爲是不可預測的(可能會提交,可能不會,並且可能會拋出異常)。如果不清楚,請詳細說明你的意思是「優秀承諾 @SasaNinkovic我很想知道更多,你可以在這裏詳細說明你的配置或發佈在github上的問題?謝謝! – Treziac