2014-05-17 44 views
1

我使用Spring集成TCP服務器,它保持連接到幾千客戶端。我需要服務器來限制客戶端在負載過重的情況下不會丟失消息。春季集成 - 可靠的TCP高容量應用程序

我的服務器配置:

<task:executor id="myTaskExecutor" 
    pool-size="4-8" 
    queue-capacity="0" 
    rejection-policy="CALLER_RUNS" /> 

<int-ip:tcp-connection-factory id="serverTcpConFact" 
    type="server" 
    port="60000" 
    using-nio="true" 
    single-use="false" 
    so-timeout="300000" 
    task-executor="myTaskExecutor" /> 

<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter" 
    channel="tcpInbound" 
    connection-factory="serverTcpConFact" /> 

<channel id="tcpInbound" /> 

<service-activator input-channel="tcpInbound" 
    ref="myService" 
    method="test" /> 

<beans:bean id="myService" class="org.test.tcpserver.MyService" /> 

由於連接工廠默認任務執行者是無界的,我用一個彙集任務執行,以防止內存不足的錯誤。

一個簡單的客戶端負載測試:

public class TCPClientTest { 
    static Socket socket; 
    static List<Socket> sl = new ArrayList<>(); 
    static DataOutputStream out; 

    public static void main(String[] args) throws Exception { 
     for (int i = 0; i < 10000; i++) { 
      socket = new Socket("localhost", 60000); 
      sl.add(socket); 
      out = new DataOutputStream(socket.getOutputStream()); 
      out.writeBytes("connection " + i + "\r\n"); 
      System.out.println("Using connection #" + i); 
     } 
     System.in.read(); 
    } 
} 

當我運行它,服務器只接收大約10-20消息,然後客戶端獲得「連接被拒絕:連接」異常。之後,即使在連接超時之後,服務器也不能再接受任何新的連接。增加池大小僅有助於獲得更多的消息。

編輯

我使用Spring集成3.0.2.RELEASE。對於生產,我使用8-40個線程,但它僅在幾百次連接之後纔會使此測試失敗。

MyService.test()並沒有做太多......

public class MyService { 
    public void test(byte[] input) { 
     System.out.println("Received: " + new String(input)); 
    } 
} 

Here is the log with trace level logging.

Sources

+1

什麼版本的Spring Integration? 'MyService.test()'做了什麼?由於你只是在每個套接字上發送一條短消息,所以我不希望這個測試用例有任何線程問題(儘管4-8個線程可能完全不適合具有這個套接字數量的實際應用程序)。我建議你打開服務器端的跟蹤級日誌記錄。 –

+0

@Gary Russell編輯我的問題。 – John29

回答

2

我看看有什麼問題,請打開JIRA issue

問題是在執行程序中帶有0長度隊列的CALLER_RUNS拒絕策略。

有一個線程可以處理所有IO事件(通常爲myTaskExecutor-1);當讀取事件觸發時,他會排隊執行以讀取數據;閱讀器線程排隊執行以組裝數據(這將阻塞,直到完成消息 - 在您的情況下由CRLF終止)到達)。

在這種情況下,當沒有線程可用時,CALLER_RUNS策略意味着IO選擇器線程進行讀取操作,併成爲彙編程序線程,這會阻止等待因爲被阻塞而不會到達的數據,在調度不同的線程後阻止讀取數據。因爲他被封鎖,他無法處理新的接受事件。

這裏是我的測試日誌顯示的問題...

TRACE: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioServerConnectionFactory - Port 60000 SelectionCount: 2 
DEBUG: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Reading... 
DEBUG: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Running an assembler 
TRACE: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Nio message assembler running... 
DEBUG: [May-18 10:43:38,926][myTaskExecutor-1] tcp.serializer.ByteArrayCrLfSerializer - Available to read:0 

第二行顯示選擇的線程被用來做讀取;他檢測到此套接字需要彙編程序,併成爲彙編程序,阻塞,等待數據。

你真的相信會有使用無限任務執行程序的問題嗎?這些事件通常非常短暫,因此線程將很快被回收。

將執行程序的隊列容量增加到0以上也應該有所幫助,但它不會完全保證問題不會發生(儘管大的隊列大小不太可能被擊中)。

我還不確定如何解決這個問題,除了爲IO選擇器和讀取器線程使用專用的任務執行程序以使它們永遠不會用作彙編程序。

+1

感謝您的快速響應和解釋。我認爲,在真實世界的場景中,使用無限任務執行程序不太可能會遇到問題,但是如果使用它,至少在與java.lang.OutOfMemoryError約4000個連接後出現相同的測試會崩潰。我認爲,無論負載如何,都可以運作。我會打開一個JIRA問題。 – John29

+0

我想寫一些像[CallerBlocksPolicy](https://gist.github.com/garyrussell/63038cdd886e00ef01b8)一段時間(以涵蓋其他用例),但從來沒有解決它。 8個線程仍然不足以滿足您的測試用例,因爲它試圖啓動彙編程序超時。然而,如同主題中顯示的80個線程,在創建近9000個連接之後,我剛剛殺死了你的測試並且仍然很強大。 –

+0

我試過了你的'CallerBlocksPolicy',但正如你所說,它仍然不能很好地處理8個線程。當你說它試圖啓動一個彙編程序時,它意味着什麼超時?我不介意增加線程的數量,但我不明白爲什麼8個線程仍然會導致測試失敗,即使我們不再使用'CALLER_RUNS'拒絕策略?當然,池中的線程數量應該會影響性能,但爲什麼它也會影響穩定性? – John29

0

昨天我寫了一個示例,只是爲了使用spring集成創建tcp高性能服務器代碼。我使用JMeter TCP採樣器對1000個併發客戶端請求進行了成功測試。

這裏是代碼 - https://github.com/rajeshgheware/spring-integration-samples包括JMeter測試配置文件。

我與具有英特爾核i5 M520 2.4GHz的在64位筆記本1000次併發TCP客戶端請求(在服務器代碼和JMeter測試此機器上運行)成功地測試

我也試圖與1500個併發客戶端請求,但觀察到該服務器無法履行許多請求。我將繼續嘗試增強此代碼以服務10000個併發客戶端請求(我知道我可能需要從亞馬遜獲得用於此測試的優秀EC2計算機:))