2015-05-12 39 views
0

是否有可能在春季xd運行時將kafka源模塊用作處理器模塊?任何代碼示例?如何讓kafka在spring xd運行時使用http流數據?

我想實現這樣的事情:http(xd source)| kafka源(xd處理器)| kafka消費者(xd接收器)

我正在嘗試這樣做,因爲我有通過http傳輸的流數據,我想用kafka消息總線進行管理。

我的流定義是這樣的:

stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy 

撲滅在彈簧XD結果的處理器模塊的盒實現卡夫卡源模塊執行的一個錯誤是這樣的:

2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message 

有機springframework.messaging.MessageDeliveryException:調度程序沒有用戶通道'admin:default,admin,singlenode,hsqldbServer:9393.kafkaSourceTest.0'。嵌套異常是org.springframework.integration.MessageDispatchingException:分派器沒有訂戶 at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel的.java:277) 在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在org.springframework .messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)

回答

3

我想d這是因爲我有通過http發送的流數據,我想用kafka消息總線進行管理。

如果您使用kafka作爲messagebus(設置傳輸之後),則類似「http | log」的流將通過kafka messagebus傳輸http消息。在這種情況下,卡夫卡經紀人的話題將由XD內部人員定義。

是否有可能在春季xd運行時將kafka源模塊作爲處理器模塊工作?

不,源模塊不能充當處理器模塊。如果您希望消息在Kafka中流過特定主題,那麼您可以有一個流,該流具有一個從http源接收數據的kafka接收器模塊,以及另一個使用相同主題配置kafka源模塊的流。

和,這可以這樣來實現:

流創建KafkaSink --definition的 「http --outputType =應用/ JSON |卡夫卡--brokerList = --topic = kafkaTestTopic」 --deploy

stream create KafkaSource --definition「kafka --zkconnect = localhost:2181 --topic = kafkaTestTopic | log」--deploy

相關問題