2017-02-20 56 views
0

我想使用Spring Cloud Stream App Starter TCP Source project(maven工件)以便能夠通過套接字/端口接收TCP消息,處理它們然後將結果推送到消息經紀人(如RabbitMQ)。如何使用Spring Cloud Stream應用程序啓動程序處理消息TCP

這個TCP源項目似乎正是我想要的,但它會自動將收到的消息發送到輸出通道。那麼,是否有一種乾淨的方式仍然使用TCP源項目,但攔截TCP傳入消息在內部轉換它們之前將它們輸出到我的消息代理?

回答

1

請參閱aggregation

您使用源和處理器創建聚合應用程序。

Spring Cloud Stream支持將多個應用程序聚合在一起,直接連接其輸入和輸出通道,避免了通過代理交換消息的額外成本。隨着春天的云溪的1.0版本,聚集僅支持以下類型的應用:

源,匯處理器...

它們可以通過創建互連應用的序列聚集在一起,在如果序列中元素的輸出通道連接到下一個元素的輸入通道(如果存在)。一個序列可以從一個源或一個處理器開始,它可以包含任意數量的處理器,並且必須以處理器或接收器結束。

編輯

作爲一個變通的源自動裝配的問題,你可以嘗試像...

@EnableBinding(Source.class) 
@EnableConfigurationProperties(TcpSourceProperties.class) 
public class MyTcpSourceConfiguration { 

    @Autowired 
    private Source channels; 

    @Autowired 
    private TcpSourceProperties properties; 

    @Bean 
    public TcpReceivingChannelAdapter adapter(
      @Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) { 
     TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
     adapter.setConnectionFactory(connectionFactory); 
     adapter.setOutputChannelName("toMyProcessor"); 
     return adapter; 
    } 

    @ServiceActivator(inputChannel = "toMyProcessor", outputChannel = Source.OUTPUT) 
    public byte[] myProcessor(byte[] fromTcp) { 
     ... 
    } 

    @Bean 
    public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory(
      @Qualifier("tcpSourceDecoder") AbstractByteArraySerializer decoder) throws Exception { 
     TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean(); 
     factoryBean.setType("server"); 
     factoryBean.setPort(this.properties.getPort()); 
     factoryBean.setUsingNio(this.properties.isNio()); 
     factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers()); 
     factoryBean.setLookupHost(this.properties.isReverseLookup()); 
     factoryBean.setDeserializer(decoder); 
     factoryBean.setSoTimeout(this.properties.getSocketTimeout()); 
     return factoryBean; 
    } 

    @Bean 
    public EncoderDecoderFactoryBean tcpSourceDecoder() { 
     EncoderDecoderFactoryBean factoryBean = new EncoderDecoderFactoryBean(this.properties.getDecoder()); 
     factoryBean.setMaxMessageSize(this.properties.getBufferSize()); 
     return factoryBean; 
    } 

} 
+0

好吧,我確實成功創建聚集到internaly水槽所產生的消息由TCP來源。但是,當我嘗試將我的接收器更改爲處理器時。我收到一條錯誤消息:「org.springframework.cloud.stream.app.tcp.source.TcpSourceConfiguration中的字段通道需要單個bean,但找到了2個:1-源2-處理器:以編程方式註冊的單例」。你知道解決這個問題嗎? – rattek

+1

是的,這是一個問題;我已經打開了一個[問題,如果你想跟蹤](https://github.com/spring-cloud/spring-cloud-stream/issues/819)一個解決方案將實現自己的源代碼 - 複製' TcpSourceConfiguration'並直接在源代碼中添加處理。我會用細節編輯我的答案。 –

+0

對不起,但上面的代碼似乎不工作。謝謝你的幫助! – rattek

相關問題