1

實現最終一致的分佈式體系結構變得非常痛苦。有大量的博客文章講述如何做到這一點,但沒有展示(代碼)如何實際做到這一點。Spring雲流Kafka - 最終一致性 - Kafka自動重試未確認的消息(使用autocommitoffset = false時)

我正在遭受的一個方面是不得不處理手動重試消息時,他們還沒有被ack'd。

例如:我的訂單服務向卡夫卡發送了付款事件。支付服務訂閱了它,處理它,付款行或付款失敗回答

  1. 要求支付:Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service

  2. 付款方式確定: - >Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service

  3. 方式失敗 - >Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service

問題是:

我確實知道何時通過使用同步發送將消息發送給Kafka。但是,我必須知道付款服務已經處理了付款的唯一方法是通過預期回答事件(Payment ok | Payment failure)。

這迫使我在訂單服務器中實施重試機制。如果在一段時間內沒有得到答案,請重新嘗試新的Pay事件。

更重要的是,這也迫使我在付款服務中處理重複的郵件,以防他們實際處理,但答案沒有得到訂購服務。

我在想,如果消費者沒有確認消息的新偏移量,卡夫卡是否有內置的機制來發送重試。

在春季雲流,我們可以設置一個autoCommitOffset屬性設置爲false和處理在消費的偏移ACK:

@StreamListener(Sink.INPUT) 
public void process(Message<?> message) { 
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
    if (acknowledgment != null) { 
     System.out.println("Acknowledgment provided"); 
     acknowledgment.acknowledge(); 
    } 
} 

如果我們不執行,會發生什麼acknowledgment.acknowledge();的訊息是自動重發由卡夫卡給這個消費者?

如果可能的話,我們就不需要手動重試任何更多,可以做這樣的東西:

Paymen服務:

@Autowired 
private PaymentBusiness paymentBusiness; 

@StreamListener(Sink.INPUT) 
public void process(Order order) { 
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
    if (acknowledgment != null) { 
     paymentBusiness(order);    
     //If we don't get here because of an exception 
     //Kafka would retry... 
     acknowledgment.acknowledge(); 
    } 
} 

如果這是可能的,是怎樣的重試周期。在Kafka中配置?

在最壞的情況下(也是最有可能的)情況下,這不受支持,我們必須手動重試。你知道使用Kafka處理最終一致性的Spring Cloud Stream應用程序的真實例子嗎?

回答

1

如果我們不執行acknowledgegment.acknowledge(),會發生什麼?這封郵件是否會被卡夫卡自動重新發送給這位消費者?

不是。卡夫卡消費者只要客戶端打開就按順序讀取消息。 Kafka不支持更復雜的確認模式,如單個消息確認,只更新給定用戶組的偏移量和分區主題。 Spring Cloud Stream支持手動確認Spring Cloud Stream中的消息,以便異步處理它們(從而防止消息丟失) - 但是假設一旦手動確認消息,就會保存其偏移量,因此所有以前的消息都來自同一個消息分區主題將被視爲「讀取」。如果你想挑選失敗的消息,你可以使用DLQ支持 - 並讓後續消費者接收它們。重新啓動消費者將從上次保存的偏移量繼續讀取,因此您可以選擇不保存一系列未成功處理的消息的偏移量。

春季云溪消費者已經內置重試和DLQ支持 - 看enableDlqhttp://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_properties以及重試設置設置爲默認用戶屬性的一部分:http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_consumer_properties

相關問題