2017-10-12 27 views
1

我挖成Apache KafkaSpring Cloud Stream和觀察到一些行爲使我不知道如果我做錯了什麼,或者如果如預期運行 - 這我幾乎懷疑:Apache Kafka - 錯誤時可能會丟失郵件嗎?

有可能失去上的錯誤消息!?

我的設置儘可能簡單。一個卡夫卡經紀人和一個只有一個分區的主題。經紀人,主題,製作人和消費者都使用默認設置(自動確認爲真)。

測試用例1

  • 產生message1
  • 產生message2
  • 啓動一個消費者,這將扔在接收任何消息
  • 消耗message1一個RuntimeException,重試
  • 消耗message1,重試
  • 消費message1,重試
  • 拋出異常
  • 消費message2,重試
  • 消費message2,重試
  • 消費message2,重試
  • 拋出異常
  • 停止和重新啓動消費者
  • 消費message1,重試
  • 消耗message1,重試
  • 消耗message1,重試
  • 拋出異常
  • 消耗message2,重試
  • 消耗message2,重試
  • 消耗message2,重試
  • 拋出異常

按預期工作。

測試用例2

  • 產生message1
  • 產生message2
  • 啓動消費,將扔在接受正是message1
  • 消耗message1一個RuntimeException,重試
  • 消費message1,重試
  • 消費message1,重試
  • 拋出異常
  • 成功消費message2
  • 產生message3
  • 成功消費message3
  • 停止和重新啓動消費
  • 沒有反應,則消費者等待新消息消耗

message1將被跳過,因爲提交的偏移量已被設置爲message3。這讓我很困擾。只要前面的消息沒有成功處理,我不希望消費者繼續使用消息。

有沒有人經歷過相同的行爲和/或可能可以指導我如何改變這種情況?

在此先感謝!


更新:的要求,某些代碼段

創建主題

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic 

連接生產者

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 

創建一個Maven項目

<parent> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-parent</artifactId> 
    <version>1.5.7.RELEASE</version> 
    <relativePath/> 
</parent> 

... 

<dependencyManagement> 
    <dependencies> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-dependencies</artifactId> 
      <version>Dalston.SR4</version> 
      <type>pom</type> 
      <scope>import</scope> 
     </dependency> 
    </dependencies> 
</dependencyManagement> 


<dependencies> 
    <dependency> 
     <groupId>org.springframework.cloud</groupId> 
     <artifactId>spring-cloud-starter-stream-kafka</artifactId> 
    </dependency> 
</dependencies> 

添加以下application.yml

spring: 
    cloud: 
    stream: 
     bindings: 
     input: 
      destination: test-topic 
      contentType: text/plain 
      group: test-group 
      consumer: 
      header-mode: raw 
     kafka: 
     binder: 
      zkNodes: localhost:2181 
      brokers: localhost:9092 

添加以下Application.java

@SpringBootApplication 
@EnableBinding(Sink.class) 
public class Application { 

    private static final Logger log = LoggerFactory.getLogger(Application.class); 

    public static void main(String[] args) { 
     SpringApplication.run(Application.class, args); 
    } 

    @StreamListener(Sink.INPUT) 
    private void consume(Message<String> message) { 
     log.info("Received: {}", message.getPayload()); 
     if ("message1".equals(message.getPayload()) 
      throw new RuntimeException(); 
     log.info("Successfully processed message {}", message.getPayload()); 
    } 
} 

這應該是它。運行應用程序並使用console-producer生成消息。

+0

爲什麼消費者在使用testcase2消耗message1失敗時消耗message2? – herokingsley

+0

@herokingsley我不知道,但這是發生了什麼。如果在使用'message1'嘗試失敗後不會消耗'message2',那麼我會很滿意。 – stphngrtz

+1

也許向我們展示一些代碼或日誌將有所幫助 – herokingsley

回答

0

卡夫卡給你一個運行時間,但你有選擇的權力。在某些情況下,消息可能會丟失/跳過,有些消息可能不會消失 - 您需要根據需要準備配置。國際海事組織,你應該進一步調查一些春季雲流設置。你也可以玩弄禁用自動提交和「手動」抵消抵消。

+0

我會給人工提交一個嘗試,但我幾乎不能相信我所描述的行爲對於大多數情況都是可以的。默認情況下,'message1'應該阻止'message2',直到成功處理(然後確認),如果你不想阻塞,那麼你將不得不改變配置 - 但這只是我,也許我沒有得到大的圖片馬上。 – stphngrtz

+0

只是禁用自動提交是不夠的。您需要跟蹤確認的消息並手動檢查即將處理的消息是否在處理消息之前處於下一行。這絕對不令我滿意。 – stphngrtz

0

在卡夫卡,每條消息都帶有一個偏移ID。您的消費者應用程序可以對偏移量進行檢查,並且是否跳過或忽略了偏移量而不是消耗下一條消息。您可以使用consumer.seek方法獲取缺少的特定消息。

抵消本質上是遞增的,並且是連續的。

並在你的情況下使用手動提交。

我可以說使用下面的步驟..

  1. 輪詢方法後,首先檢查先前提交的偏移和 ,並請求下一個偏移值

  2. 一旦消息被消耗,並且成功地處理,請在內存或表格的內部 中保存 已成功處理的消息的偏移值。在接下來的調查

下面的鏈接將不會爲你的用例,但你可以得到公平的想法

參考Example

+0

stphngrtz沒有使用原生卡夫卡API – Arek

+0

無論哪個API,邏輯相同.. –

+0

我同意,但SCS提供了很多抽象。無論如何,你是對的,自我管理抵消是一種選擇。 – Arek

0

您應該配置這種情況下,DLQ。如果您的郵件在重試3次後無法使用,則很可能根本不會使用郵件,或需要特殊處理。 如果只有一個分區,設置一個有毒消息可以着陸的DLQ,並且不會丟失消息

+0

但是將'message1'移動到DLQ會打亂順序,對吧?移動'message1'到DLQ後,線程將繼續'message2'。 – stphngrtz

+0

它不會破壞順序。它不像message2在message1之前被消費。但是,除非你想停止所有錯誤消息的消費,並且因此不提供偏移量,否則在任何面向消息的系統中都會發生這種情況。錯誤消息被髮送到一個特殊的地方,流程繼續。否則,如果有任何錯誤,系統會停止 –