我想使用Spring Cloud Stream App Starter TCP Source project(maven工件)以便能夠通過套接字/端口接收TCP消息,處理它們然後將結果推送到消息經紀人(如RabbitMQ)。如何使用Spring Cloud Stream應用程序啓動程序處理消息TCP
這個TCP源項目似乎正是我想要的,但它會自動將收到的消息發送到輸出通道。那麼,是否有一種乾淨的方式仍然使用TCP源項目,但攔截TCP傳入消息在內部轉換它們之前將它們輸出到我的消息代理?
我想使用Spring Cloud Stream App Starter TCP Source project(maven工件)以便能夠通過套接字/端口接收TCP消息,處理它們然後將結果推送到消息經紀人(如RabbitMQ)。如何使用Spring Cloud Stream應用程序啓動程序處理消息TCP
這個TCP源項目似乎正是我想要的,但它會自動將收到的消息發送到輸出通道。那麼,是否有一種乾淨的方式仍然使用TCP源項目,但攔截TCP傳入消息在內部轉換它們之前將它們輸出到我的消息代理?
請參閱aggregation。
您使用源和處理器創建聚合應用程序。
Spring Cloud Stream支持將多個應用程序聚合在一起,直接連接其輸入和輸出通道,避免了通過代理交換消息的額外成本。隨着春天的云溪的1.0版本,聚集僅支持以下類型的應用:
源,匯處理器...
它們可以通過創建互連應用的序列聚集在一起,在如果序列中元素的輸出通道連接到下一個元素的輸入通道(如果存在)。一個序列可以從一個源或一個處理器開始,它可以包含任意數量的處理器,並且必須以處理器或接收器結束。
編輯
作爲一個變通的源自動裝配的問題,你可以嘗試像...
@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpSourceProperties.class)
public class MyTcpSourceConfiguration {
@Autowired
private Source channels;
@Autowired
private TcpSourceProperties properties;
@Bean
public TcpReceivingChannelAdapter adapter(
@Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannelName("toMyProcessor");
return adapter;
}
@ServiceActivator(inputChannel = "toMyProcessor", outputChannel = Source.OUTPUT)
public byte[] myProcessor(byte[] fromTcp) {
...
}
@Bean
public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory(
@Qualifier("tcpSourceDecoder") AbstractByteArraySerializer decoder) throws Exception {
TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
factoryBean.setType("server");
factoryBean.setPort(this.properties.getPort());
factoryBean.setUsingNio(this.properties.isNio());
factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
factoryBean.setLookupHost(this.properties.isReverseLookup());
factoryBean.setDeserializer(decoder);
factoryBean.setSoTimeout(this.properties.getSocketTimeout());
return factoryBean;
}
@Bean
public EncoderDecoderFactoryBean tcpSourceDecoder() {
EncoderDecoderFactoryBean factoryBean = new EncoderDecoderFactoryBean(this.properties.getDecoder());
factoryBean.setMaxMessageSize(this.properties.getBufferSize());
return factoryBean;
}
}
好吧,我確實成功創建聚集到internaly水槽所產生的消息由TCP來源。但是,當我嘗試將我的接收器更改爲處理器時。我收到一條錯誤消息:「org.springframework.cloud.stream.app.tcp.source.TcpSourceConfiguration中的字段通道需要單個bean,但找到了2個:1-源2-處理器:以編程方式註冊的單例」。你知道解決這個問題嗎? – rattek
是的,這是一個問題;我已經打開了一個[問題,如果你想跟蹤](https://github.com/spring-cloud/spring-cloud-stream/issues/819)一個解決方案將實現自己的源代碼 - 複製' TcpSourceConfiguration'並直接在源代碼中添加處理。我會用細節編輯我的答案。 –
對不起,但上面的代碼似乎不工作。謝謝你的幫助! – rattek