0
我正在使用Confluent.Kafka dotnet客戶端。卡夫卡消費者提交線程安全
namespace Confluent.Kafka
{
public class Consumer<TKey, TValue> : IDisposable
{
public Task<CommittedOffsets> CommitAsync();
}
}
正如您所見,Consumer.CommitAsync是一種異步方法。在不等待回覆的情況下撥打CommitAsync
方法是否安全,然後再撥打Subscribe
?
下面的示例代碼。
using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer()))
{
consumer.Subscribe(topics);
while (true)
{
Message<MessageKey, byte[]> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
{
// ...
if(msg.Offset % 100 == 0)
{
consumer.CommitAsync().ContinueWith((t) =>
{
// log t.Exception
}, TaskContinuationOptions.OnlyOnFaulted);
}
}
}
}