2016-04-11 122 views
2

我正在將一個簡單的Kafka使用者應用程序移出現有框架,並且覺得spring-cloud-stream是一種簡單的方法。我使用Initializr來引導應用程序,該應用程序現在使用Spring-Boot v1.3.3和Spring-Cloud-Stream v1.0.0-RC1。該應用程序非常簡單,只需從Kafka中選擇一條消息,反序列化JSON編碼對象並將其傳遞到我們現有的庫。爲了開始,我只使用了LogSink示例,因爲最終我不會做其他事情(只是反序列化並將對象傳遞給其他方法)。來自EmbeddedHeadersMessageConverter的StringIndexOutOfBoundsException

這一切都很好:它連接到卡夫卡,接收消息並將它傳遞(作爲字節[])到我的接收器。然而,EmbeddedHeadersMessageConverter記錄一個的StringIndexOutOfBoundsException:

2016-04-11 10:06:50.287 ERROR 11464 --- [pool-1-thread-1] fkaMessageChannelBinder$ReceivingHandler : Could not convert message: 7B2267656E65726174696F6E223A3 [...] 
java.lang.StringIndexOutOfBoundsException: String index out of range: 2009 
    at java.lang.String.checkBounds(String.java:373) ~[na:1.8.0_25] 
    at java.lang.String.<init>(String.java:413) ~[na:1.8.0_25] 
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:131) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1] 
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:104) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1] 
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:583) ~[spring-cloud-stream-binder-kafka-1.0.0.RC1.jar:1.0.0.RC1] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE] 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.0.RELEASE.jar:na] 
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.0.RELEASE.jar:na] 
    at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.0.RELEASE.jar:na] 
    at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.0.RELEASE.jar:na] 
    at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.0.RELEASE.jar:na] 
    at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.7.RELEASE.jar:na] 
    at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.7.RELEASE.jar:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25] 

https://github.com/spring-cloud/spring-cloud-stream/issues/209似乎預示着問題是缺少卡夫卡頭,這是真實的,沒有任何。但提到的解決方案是添加

spring.cloud.stream.binder.kafka.mode=raw 

到我的應用程序配置。不幸的是,這並不適合我。此外,實際上有STS自動完成針對相應的屬性和它建議

spring.cloud.stream.kafka.binder.mode=raw 

的2既不(單獨或組合)所做的任何差異,異常仍然被記錄。

我已經使用了Spring多年,但這將是我的第一個Spring-Boot/Spring-Cloud應用程序。

這裏的應用程序代碼:

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.annotation.StreamListener; 
import org.springframework.cloud.stream.messaging.Sink; 
import org.springframework.integration.annotation.ServiceActivator; 

@SpringBootApplication 
public class UpdateApplication { 
    private static Logger logger = LoggerFactory.getLogger(UpdateApplication.class); 

    public static void main(String[] args) { 
     SpringApplication.run(UpdateApplication.class, args); 
    } 

    @EnableBinding(Sink.class) 
    public static class UpdateHandler { 

     @StreamListener(Sink.INPUT) 
     //@ServiceActivator(inputChannel=Sink.INPUT) 
     public void loggerSink(Object payload) { 
      logger.info("Received: " + payload); 
     }   
    } 
} 

我都嘗試,@ServiceActivator以及@StreamListener註釋,在這種情況下,似乎沒有有所作爲。

我application.properties看起來是這樣的:

spring.cloud.stream.bindings.input.binder=kafka 
spring.cloud.stream.bindings.input.destination=updates 
spring.cloud.stream.bindings.input.group=update-client 
spring.cloud.stream.kafka.binder.brokers=brokerName 
spring.cloud.stream.kafka.binder.zkNodes=zookeeperName 
spring.cloud.stream.kafka.binder.mode=raw 

任何幫助擺脫這種錯誤的,將不勝感激。

作爲一個方面說明:由於我剛開始帶彈簧雲流試驗我加

spring.cloud.stream.bindings.updates.consumer.resetOffsets=true 
spring.cloud.stream.bindings.updates.consumer.startOffset=earlist 

的配置,以避免爲每個I重啓時間發送新郵件,但沒有奏效。

回答

4

由於RC該選項已被移至.consumer.配置選項。

所以,現在你必須這樣做:

spring.cloud.stream.bindings.input.consumer.mode=raw 

看到Reference Manual更多信息。

+2

嗨 - 這是正確的版本,RC2文檔仍然使用了錯誤的屬性名稱。 HTTP://docs.spring。io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle /#_ consumer_properties(即'spring.cloud.stream.bindings.input.consumer.headerMode = raw') –

+0

啊,謝謝你,@MariusBogoevici,this作品。此外,我的印象是在文檔中指的是實際的目的地(在我的情況下是「更新」),而它應該是「輸入」。剛剛出於好奇:spring.cloud.stream.bindings.input.consumer.resetOffsets = true spring.cloud.stream.bindings.input.consumer.startOffset = earlist 應該從頭開始讀,不?這仍然行不通,但我現在並不需要它。 –

+0

沒錯 - 關鍵是頻道名稱而不是「destination」的值 - 這可以讓您保持邏輯配置,同時還可以在運行時覆蓋目標。 –

0

spring.cloud.stream.bindings.input.consumer.headerMode=raw

正在爲版本1.1.0.RELEASE。

相關問題