2012-05-09 63 views
4

我正在分析的代碼使用Netty NioDatagramChannelFactory創建UDP服務器。 它創建了一個線程池:線程在Netty UDP服務器中不同時執行

ExecutorService threadPool = Executors.newCachedThreadPool(); 

則數據報通道,pipelineFactory &自舉:

int workerCount = 10; 
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount); 
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(); 

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory); 
bootStrap.setPipelineFactory(pipelineFactory); 
bootStrap.bind(new InetSocketAddress(host, port)); 

在pipelineFactory中,getPipeline()增加了對定製處理器。

就像有人說的: Multi-threaded Handling of UDP Messages

只有一個線程處理接收到的消息。在日誌中,線程名稱顯示爲新的I/O數據報工作者#1像:

2012-04-20 09:20:51,853新的I/O數據報工作者#1' - '1 INFO [cemrsh SNMPTrapsRequestHandler:42] messageReceived |處理:V1TRAP [reqestID = 0,...]

我閱讀文檔和該條目:Lot of UDP requests lost in UDP server with Netty

然後我根據這些條目改變一個位的代碼。

int corePoolSize = 5; 
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576); 

並擁有和ExecutionHandler的pipelineFactory:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool); 
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler); 

而且getPipeline()將處理程序等記載: 現在線程池與創建

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory { 

    private ExecutionHandler executionHandler = null; 

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
     this.executionHandler = executionHandler; 
    } 

    @Override 
    public ChannelPipeline getPipeline() throws Exception { 

     ChannelPipeline pipeline = Channels.pipeline(); 
     pipeline.addFirst("ExecutorHandler", executionHandler); 

     // Here the custom handlers are added 
     pipeline.addLast(...) 
    } 

現在,我在日誌中獲得4個不同的線程名稱。他們將顯示爲池2線程1池2線程2,等等

例如:

2012-05-09 09:12:19589 pool- 2-thread-1 INFO [cemrshSNMPTrapsRequestHandler:46] messageReceived |正在處理:V1TRAP [reqestID = 0,...]

但它們不會同時處理。 messageReceived()下的處理必須在一個線程上完成,以處理下一個消息。 我從不同的客戶端發送了一堆消息到服務器,我得到的日誌不是交錯的。我也試着在messageReceived()裏面使用Thread.sleep(),並確認了之前的內容。

我錯過了什麼嗎? 有沒有辦法用Netty實現一個REAL多線程UDP服務器? 如何獲得不同的線程同時執行messageReceived()?

+0

如果我沒有弄錯,那麼OrderedMemoryAwareThreadPoolExecutor執行來自同一個線程中同一客戶端的請求。 – kofemann

回答

0

跳到我身上的一件事是,你把你的執行處理程序第一個在流水線中。我相信目的是整個管道直到「應用程序」處理程序都應該由執行IO解碼的IO線程執行。

因此,我會斷言你會首先添加所有的SNMPTrap解碼處理程序,然後,當你有一個真正的SNMPTrap時,它會被傳遞給執行處理程序,然後該處理程序將陷阱傳遞給實際消費者的陷阱做一些有用的事情。

@Override 
public ChannelPipeline getPipeline() throws Exception { 

    ChannelPipeline pipeline = Channels.pipeline(
     new SomethingSomethingDecoder(), 
     new SNMPTrapDecoder(), 
     executionHandler. 
     snmpTrapConsumerHandler 
    ); 
} 

至少,這是它是如何在ExecutionHandler javadoc中所示,和上面的是我對它的解釋。

+0

是的。我嘗試了這種方法,這是文檔中描述的方法,並沒有奏效。然後我試着用addFirst()添加executionHandler,就像在問題的回答中所描述的那樣:在Netty的UDP服務器中丟失了大量的UDP請求。但他們都沒有工作。 – nephewtom

0

根據我的經驗和我對UDP的Netty的理解,通常只有一個線程處理解碼的UDP消息。由於UDP是無會話的,只有一個線程可以在一個UDP端口上接收數據並對其進行解碼。一旦將數據解碼並將其包裝到緩衝區或特定的java對象中,然後可以將該對象放入將處理它的線程池(執行處理程序 - >您的業務處理程序)中。然後,一旦將以前解碼的數據釋放到執行處理程序中,就可以對UDP端口上即將發佈的新數據進行解碼。

創建NioDatagramChannelFactory時可以指定的池的線程僅在偵聽多個端口上的數據時使用。每個端口只有一個線程是有意義的。即使您在該構造函數中指定了100個工作者,如果您配置了一個UDP端口,也只會使用一個。

+0

看來你是對的,每個端口只有一個線程。 但對我來說,它沒有任何意義... 看來這只是netty的設計方式。 我不得不添加我的「線程代碼」來表現這種行爲,當我使用一個框架來處理具有數萬億選項的線程池時... 我不敢相信它...... – nephewtom

+0

你是否想關注那麼你的代碼作爲答案呢?我同意你的看法,即使只有一個接收端口,也可能有很多來源,所以多線程不僅有意義,而且對於爲大量客戶提供服務似乎也是必要的。 – gsimard