2013-11-03 62 views
0

我正在評估春天和它的生態系統爲新項目如此裸露與我。我希望我的應用程序對來自數千個不同客戶端的UDP事件做出反應。每個客戶端每隔5-10s發送1-10個UDP數據包。每個數據包都將被處理得非常快(主要在內存和小型計算中,只有偶爾的數據庫調用纔會幫助使用redis)。將不會有數據返回給調用者。UDP處理器與反應器在春天

我在Spring中實現了Reactor,就像他們在wiki上描述的一樣。 然後我實現了像Spring集成文檔中描述的UDP入站通道。下面是配置:

<int-ip:udp-inbound-channel-adapter id="receiverChannel" 
            channel="stringConvert" 
            port="9000" 
            multicast="false" 
            check-length="false" 
            pool-size="10" 
            lookup-host="false" 
     /> 

<int:transformer id="convertChannel" 
       input-channel="stringConvert" 
       output-channel="toProcess" 
       ref="transformer" 
       method="transform" 

     /> 

<int:service-activator input-channel="toProcess" 
         ref="accumulator" 
         method="accumulate"/> 

<bean id="accumulator" class="hello.UDPAccumulator" /> 
<bean id="transformer" class="hello.UDPTransformer" /> 

然後在UDPAccumulator我發佈消息反應器:

@Service 
public class UDPAccumulator { 

@Autowired 
ReactorProducer producer; 

public void accumulate(String quote) { 
    producer.fireEvent(quote); 

} 

}

這是「正確」的方式做到這一點,就我想高吞吐量? int-ip:udp -inbound-channel-adapter的內部工作是什麼?在將消息傳遞給反應堆之前它可以成爲瓶頸嗎?我看到反應堆有一些TCP相關的類和支持,但沒有UDP。任何有關如何以最好的方式做到這一點的建議,我們感激不盡!

獎金問題。如果消息比分派到反應堆的速度更快,該怎麼辦?請問redis message store(文章底部)有幫助嗎?如果我在反應堆中處理這些數據包的方法很慢,該怎麼辦?

回答

2

由於我們在Reactor中沒有直接的UDP支持,所以將事件發佈到Reactor中的抽象是非常明智的。但是你在你的「獎勵問題」中注意到發行商/消費者吞吐量存在問題,必須以某種特定領域的方式進行管理;那裏沒有銀彈。

在你的用例中,我實際上很想說Processor[1]可能更適合。它爲數據處理提供了更高的整體吞吐量,因爲它避免了在普通的Reactor中發生的基於動態選擇器的調度。除非您根據一些主題標準將傳入的事件分派給不同的處理程序,否則我建議您考慮一下。隨着吞吐量的增加,您不必擔心消費者會跟不上(除非您的Consumer正在做一些非常緩慢的事情,這些事情無法自動加速)。

但是,如果你真的,真的需要管理積壓,我建議通過Queue解耦您的生產者和消費者。Reactor有一個PersistentQueue[2]抽象,您可以使用JavaChronicle [3]發佈對象並保存到磁盤,然後可以使用Poller排空Consumer(javadoc在本週的某一時間即將發佈,因爲我們已準備好了1.0版本...它以前被稱爲Pipe[4])。

1

我不能和Reactor說話,但UDP適配器有一個專用的線程,它讀取原始數據包並將它們傳遞給TaskExecutor。它儘快做到這一點,因此它可以回到讀下一個數據包。

默認TaskExecutor是固定線程池。

反應器有一個DispatcherTaskExecutor可以注入適配器。

相同的任務執行器用於主要讀取器線程和切換。