2017-05-05 179 views
0

我正在從基於JMS的系統遷移到Kafka,Kafka負責同步新舊系統之間的事務。每個由新系統發佈的消息都必須由用戶/消費者成功處理。我不必擔心消息的順序。由於遺留系統中存在一些設計問題(悲觀鎖定),偶爾有一些消息到達時很少有事務可能失敗,在這種情況下,我希望消息在延遲後回來。我試圖找出如何處理與卡夫卡這種情況。如何使用Confluent Kafka處理消費者端的消息處理失敗?

我的源和目標應用程序在.NET 4.6.1和C#中。我正在使用Confluent.Kafka v0.9.5客戶端庫。卡夫卡版本爲0.10。

對於消費者應用程序,我已禁用自動提交併顯式調用commitAsync方法以在消息成功處理後提交偏移量。以下是我如何創建消費者。

var config = new Dictionary<string, object>() 
       { 
        {"group.id", GroupId}, 
        {"client.id", ClientId}, 
        {"enable.auto.commit", false}, 
        {"bootstrap.servers", _consumerConnectionConfig.BrokerUrl}, 
        { 
         "default.topic.config", new Dictionary<string, object>() 
         { 
          {"auto.offset.reset", "latest"} 
         } 
        } 
       }; 
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)); 

這是如何設置輪詢的。

consumer.Subscribe(topics); 

while (!SubscriberCancellationToken.IsCancellationRequested) 
{ 
     consumer.Poll(TimeSpan.FromMilliseconds(1000)); 
} 

這裏是在onMessage事件負責處理消息

consumer.OnMessage += (sender, message) => { 
    try 
    { 
    var payload = GetPayload(message); 
    if (_messageHandlerService.ProcessMessage(payload)) 
    {     
     consumer.CommitAsync(message).Wait(SubscriberCancellationToken); 
    } 
    else 
    { 
     //(case 1) Now what should I do????? 
    } 
    } 
    catch(Exception ex) 
    { 
     Log.Error("Unable to process xyz messages", ex); 
     throw; //??? (Case 2) should I throw the exception or should I not? 
    } 
}; 

如果可以成功處理消息,我調用commitAsync和工程就像一個魅力。現在我的問題是我該怎麼做,當我無法處理消息(情況1)或發生某種異常(情況2)時。我有什麼選擇來處理這兩種情況?

在JMS世界中,對於(情況1),我應用了延遲再發布策略。基本上,我等待1分鐘,然後重新發布消息到同一主題並提交當前消息,以便重新發布的消息將再次返回。出於某種原因,如果我不能重新發布,我只會繼續重試,直到我可以或直到重新啓動進程。如果在我能夠重新發布前重新啓動進程,未提交的消息就會回來,循環再次開始。

一旦重新發布成功,如果還有其他消息已經在主題上等待,他們現在將開始移動直到火車繞過一圈,直到一切都被處理完畢。每當我無法處理案例1中的消息時,我會記錄一個錯誤,並根據錯誤生成相應的警報,以便應用程序支持團隊可以採取一些措施,以防萬一他們必須修復遺留系統中的某些數據。在此之前,消息會一直失敗,然後重新發布。

而對於案例2,我記錄了細節並在JMS實現中拋出了一個異常。我現在想知道可能是我應該像案例1一樣處理。

現在我的問題是我應該如何處理卡夫卡世界中的這兩種情況?有沒有更好的選擇?

回答

1

幾個選項

一是這些消息與它推到另一個卡夫卡的話題,讓專門的客戶交易,或者,

重試你的消費者中,直到特定的消息被處理,或者你達到一定的閾。這可以通過調用消費者的暫停方法來臨時停止消息消費(我正在從Java消費者的角度來談,請向C#客戶端確認),運行您的重試邏輯。請確保您也繼續調用輪詢(否則消費者將被踢出組,並且其分區將被重新平衡)。完成重新嘗試後,請致電恢復方法繼續處理 - 現在輪詢方法將返回來自卡夫卡的記錄

+0

謝謝。讓我嘗試暫停和恢復功能。我將發佈結果與C#客戶端 – Vinod

+0

對不起,延遲響應。 Confluent.Kafka c#客戶端暫時沒有暫停和恢復方法。 – Vinod