我挖成Apache Kafka與Spring Cloud Stream和觀察到一些行爲使我不知道如果我做錯了什麼,或者如果如預期運行 - 這我幾乎懷疑:Apache Kafka - 錯誤時可能會丟失郵件嗎?
有可能失去上的錯誤消息!?
我的設置儘可能簡單。一個卡夫卡經紀人和一個只有一個分區的主題。經紀人,主題,製作人和消費者都使用默認設置(自動確認爲真)。
測試用例1
- 產生
message1
- 產生
message2
- 啓動一個消費者,這將扔在接收任何消息
- 消耗
message1
一個RuntimeException,重試 - 消耗
message1
,重試 - 消費
message1
,重試 - 拋出異常
- 消費
message2
,重試 - 消費
message2
,重試 - 消費
message2
,重試 - 拋出異常
- 停止和重新啓動消費者
- 消費
message1
,重試 - 消耗
message1
,重試 - 消耗
message1
,重試 - 拋出異常
- 消耗
message2
,重試 - 消耗
message2
,重試 - 消耗
message2
,重試 - 拋出異常
按預期工作。
測試用例2
- 產生
message1
- 產生
message2
- 啓動消費,將扔在接受正是
message1
- 消耗
message1
一個RuntimeException,重試 - 消費
message1
,重試 - 消費
message1
,重試 - 拋出異常
- 成功消費
message2
- 產生
message3
- 成功消費
message3
- 停止和重新啓動消費
- 沒有反應,則消費者等待新消息消耗
message1
將被跳過,因爲提交的偏移量已被設置爲message3
。這讓我很困擾。只要前面的消息沒有成功處理,我不希望消費者繼續使用消息。
有沒有人經歷過相同的行爲和/或可能可以指導我如何改變這種情況?
在此先感謝!
更新:的要求,某些代碼段
與
創建主題kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
連接生產者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
創建一個Maven項目
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.7.RELEASE</version>
<relativePath/>
</parent>
...
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
添加以下application.yml
spring:
cloud:
stream:
bindings:
input:
destination: test-topic
contentType: text/plain
group: test-group
consumer:
header-mode: raw
kafka:
binder:
zkNodes: localhost:2181
brokers: localhost:9092
添加以下Application.java
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
private void consume(Message<String> message) {
log.info("Received: {}", message.getPayload());
if ("message1".equals(message.getPayload())
throw new RuntimeException();
log.info("Successfully processed message {}", message.getPayload());
}
}
這應該是它。運行應用程序並使用console-producer生成消息。
爲什麼消費者在使用testcase2消耗message1失敗時消耗message2? – herokingsley
@herokingsley我不知道,但這是發生了什麼。如果在使用'message1'嘗試失敗後不會消耗'message2',那麼我會很滿意。 – stphngrtz
也許向我們展示一些代碼或日誌將有所幫助 – herokingsley