我正在將一個簡單的Kafka使用者應用程序移出現有框架,並且覺得spring-cloud-stream是一種簡單的方法。我使用Initializr來引導應用程序,該應用程序現在使用Spring-Boot v1.3.3和Spring-Cloud-Stream v1.0.0-RC1。該應用程序非常簡單,只需從Kafka中選擇一條消息,反序列化JSON編碼對象並將其傳遞到我們現有的庫。爲了開始,我只使用了LogSink示例,因爲最終我不會做其他事情(只是反序列化並將對象傳遞給其他方法)。來自EmbeddedHeadersMessageConverter的StringIndexOutOfBoundsException
這一切都很好:它連接到卡夫卡,接收消息並將它傳遞(作爲字節[])到我的接收器。然而,EmbeddedHeadersMessageConverter記錄一個的StringIndexOutOfBoundsException:
2016-04-11 10:06:50.287 ERROR 11464 --- [pool-1-thread-1] fkaMessageChannelBinder$ReceivingHandler : Could not convert message: 7B2267656E65726174696F6E223A3 [...]
java.lang.StringIndexOutOfBoundsException: String index out of range: 2009
at java.lang.String.checkBounds(String.java:373) ~[na:1.8.0_25]
at java.lang.String.<init>(String.java:413) ~[na:1.8.0_25]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:131) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:104) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:583) ~[spring-cloud-stream-binder-kafka-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.7.RELEASE.jar:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
https://github.com/spring-cloud/spring-cloud-stream/issues/209似乎預示着問題是缺少卡夫卡頭,這是真實的,沒有任何。但提到的解決方案是添加
spring.cloud.stream.binder.kafka.mode=raw
到我的應用程序配置。不幸的是,這並不適合我。此外,實際上有STS自動完成針對相應的屬性和它建議
spring.cloud.stream.kafka.binder.mode=raw
的2既不(單獨或組合)所做的任何差異,異常仍然被記錄。
我已經使用了Spring多年,但這將是我的第一個Spring-Boot/Spring-Cloud應用程序。
這裏的應用程序代碼:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@SpringBootApplication
public class UpdateApplication {
private static Logger logger = LoggerFactory.getLogger(UpdateApplication.class);
public static void main(String[] args) {
SpringApplication.run(UpdateApplication.class, args);
}
@EnableBinding(Sink.class)
public static class UpdateHandler {
@StreamListener(Sink.INPUT)
//@ServiceActivator(inputChannel=Sink.INPUT)
public void loggerSink(Object payload) {
logger.info("Received: " + payload);
}
}
}
我都嘗試,@ServiceActivator以及@StreamListener註釋,在這種情況下,似乎沒有有所作爲。
我application.properties看起來是這樣的:
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.input.destination=updates
spring.cloud.stream.bindings.input.group=update-client
spring.cloud.stream.kafka.binder.brokers=brokerName
spring.cloud.stream.kafka.binder.zkNodes=zookeeperName
spring.cloud.stream.kafka.binder.mode=raw
任何幫助擺脫這種錯誤的,將不勝感激。
作爲一個方面說明:由於我剛開始帶彈簧雲流試驗我加
spring.cloud.stream.bindings.updates.consumer.resetOffsets=true
spring.cloud.stream.bindings.updates.consumer.startOffset=earlist
的配置,以避免爲每個I重啓時間發送新郵件,但沒有奏效。
嗨 - 這是正確的版本,RC2文檔仍然使用了錯誤的屬性名稱。 HTTP://docs.spring。io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle /#_ consumer_properties(即'spring.cloud.stream.bindings.input.consumer.headerMode = raw') –
啊,謝謝你,@MariusBogoevici,this作品。此外,我的印象是在文檔中指的是實際的目的地(在我的情況下是「更新」),而它應該是「輸入」。剛剛出於好奇:spring.cloud.stream.bindings.input.consumer.resetOffsets = true spring.cloud.stream.bindings.input.consumer.startOffset = earlist 應該從頭開始讀,不?這仍然行不通,但我現在並不需要它。 –
沒錯 - 關鍵是頻道名稱而不是「destination」的值 - 這可以讓您保持邏輯配置,同時還可以在運行時覆蓋目標。 –