我正在從基於JMS的系統遷移到Kafka,Kafka負責同步新舊系統之間的事務。每個由新系統發佈的消息都必須由用戶/消費者成功處理。我不必擔心消息的順序。由於遺留系統中存在一些設計問題(悲觀鎖定),偶爾有一些消息到達時很少有事務可能失敗,在這種情況下,我希望消息在延遲後回來。我試圖找出如何處理與卡夫卡這種情況。如何使用Confluent Kafka處理消費者端的消息處理失敗?
我的源和目標應用程序在.NET 4.6.1和C#中。我正在使用Confluent.Kafka v0.9.5客戶端庫。卡夫卡版本爲0.10。
對於消費者應用程序,我已禁用自動提交併顯式調用commitAsync方法以在消息成功處理後提交偏移量。以下是我如何創建消費者。
var config = new Dictionary<string, object>()
{
{"group.id", GroupId},
{"client.id", ClientId},
{"enable.auto.commit", false},
{"bootstrap.servers", _consumerConnectionConfig.BrokerUrl},
{
"default.topic.config", new Dictionary<string, object>()
{
{"auto.offset.reset", "latest"}
}
}
};
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8));
這是如何設置輪詢的。
consumer.Subscribe(topics);
while (!SubscriberCancellationToken.IsCancellationRequested)
{
consumer.Poll(TimeSpan.FromMilliseconds(1000));
}
這裏是在onMessage事件負責處理消息
consumer.OnMessage += (sender, message) => {
try
{
var payload = GetPayload(message);
if (_messageHandlerService.ProcessMessage(payload))
{
consumer.CommitAsync(message).Wait(SubscriberCancellationToken);
}
else
{
//(case 1) Now what should I do?????
}
}
catch(Exception ex)
{
Log.Error("Unable to process xyz messages", ex);
throw; //??? (Case 2) should I throw the exception or should I not?
}
};
如果可以成功處理消息,我調用commitAsync和工程就像一個魅力。現在我的問題是我該怎麼做,當我無法處理消息(情況1)或發生某種異常(情況2)時。我有什麼選擇來處理這兩種情況?
在JMS世界中,對於(情況1),我應用了延遲再發布策略。基本上,我等待1分鐘,然後重新發布消息到同一主題並提交當前消息,以便重新發布的消息將再次返回。出於某種原因,如果我不能重新發布,我只會繼續重試,直到我可以或直到重新啓動進程。如果在我能夠重新發布前重新啓動進程,未提交的消息就會回來,循環再次開始。
一旦重新發布成功,如果還有其他消息已經在主題上等待,他們現在將開始移動直到火車繞過一圈,直到一切都被處理完畢。每當我無法處理案例1中的消息時,我會記錄一個錯誤,並根據錯誤生成相應的警報,以便應用程序支持團隊可以採取一些措施,以防萬一他們必須修復遺留系統中的某些數據。在此之前,消息會一直失敗,然後重新發布。
而對於案例2,我記錄了細節並在JMS實現中拋出了一個異常。我現在想知道可能是我應該像案例1一樣處理。
現在我的問題是我應該如何處理卡夫卡世界中的這兩種情況?有沒有更好的選擇?
謝謝。讓我嘗試暫停和恢復功能。我將發佈結果與C#客戶端 – Vinod
對不起,延遲響應。 Confluent.Kafka c#客戶端暫時沒有暫停和恢復方法。 – Vinod