我正在嘗試使用Spring雲流+ Kafka綁定對Apache Kafka進行「恰好一個交付」概念的一些PoC。春雲流Kafka粘結劑:「嘗試從狀態IN_TRANSACTION到狀態IN_TRANSACTION的無效轉換」
我安裝了Apache Kafka「kafka_2.11-1.0.0」,並在生產者中定義了「transactionIdPrefix」,我知道這是我在Spring Kafka中啓用事務所需要做的唯一事情,但是當我這樣做時,運行簡單源代碼&在同一個應用程序中接收器綁定,我看到一些消息在消費者中接收和打印,並且一些消息發生錯誤。
例如,消息#6接收:
[49] Received message [Payload String content=FromSource1 6][Headers={kafka_offset=1957, scst_nativeHeadersPresent=true, [email protected]695c9a9, kafka_timestampType=CREATE_TIME, my-transaction-id=my-id-6, id=302cf3ef-a154-fd42-6b43-983778e275dc, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=test10, kafka_receivedTimestamp=1514384106395, timestamp=1514384106419}]
但消息#7有一個錯誤 「無效過渡從狀態IN_TRANSACTION試圖狀態IN_TRANSACTION」:
2017-12-27 16:15:07.405 ERROR 7731 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafk[email protected]7d3bbc0b]; nested exception is org.apache.kafka.common.KafkaException: TransactionalId my-transaction-3: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, failedMessage=GenericMessage [payload=byte[13], headers={my-transaction-id=my-id-7, id=d31656af-3286-99b0-c736-d53aa57a5e65, contentType=application/json, timestamp=1514384107399}]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:575)
- 什麼這個錯誤的意思?
- 是我的配置缺少的東西?
- 我是否需要在啓用事務時以不同的方式實現Source或Sink?
UPDATE: 我打開這個項目的github上的問題,請參見討論那裏。
無法找到如何使用Spring雲流與卡夫卡結合+ Trasanctions啓用
要重現,需要創建了一個簡單的Maven項目與彈簧引導版本爲例「2.0.0.M5 「和‘彈簧雲流依賴性’版本‘Elmhurst.M3’,並且創建了一個簡單的應用程序使用此配置:
server:
port: 8082
spring:
kafka:
producer:
retries: 5555
acks: "all"
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
transaction:
transactionIdPrefix: my-transaction-
bindings:
output1:
destination: test10
group: test111
binder: kafka
input1:
destination: test10
group: test111
binder: kafka
consumer:
partitioned: true
我還創建簡單源庫類:
@EnableBinding(SampleSink.MultiInputSink.class)
public class SampleSink {
@StreamListener(MultiInputSink.INPUT1)
public synchronized void receive1(Message<?> message) {
System.out.println("["+Thread.currentThread().getId()+"] Received message " + message);
}
public interface MultiInputSink {
String INPUT1 = "input1";
@Input(INPUT1)
SubscribableChannel input1();
}
}
和:
@EnableBinding(SampleSource.MultiOutputSource.class)
public class SampleSource {
AtomicInteger atomicInteger = new AtomicInteger(1);
@Bean
@InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public synchronized MessageSource<String> messageSource1() {
return new MessageSource<String>() {
public Message<String> receive() {
String message = "FromSource1 "+atomicInteger.getAndIncrement();
m.put("my-transaction-id","my-id-"+ UUID.randomUUID());
return new GenericMessage(message, new MessageHeaders(m));
}
};
}
public interface MultiOutputSource {
String OUTPUT1 = "output1";
@Output(OUTPUT1)
MessageChannel output1();
}
}