2017-01-30 97 views
2

我有一個彈簧應用程序,應該處理和存儲套接字傳入的數據,因爲瓶頸問題,這應該與多線程完成。春季併發處理多個隊列與單線程池

傳入數據屬於許多實體和每個實體的任務應該處理順序,但我認爲分配一個線程每個實體是不是一個很好的解決方案(千單一線程來處理每個實體的隊列)

那麼我怎樣才能定義一個公共ThreadPool來處理所有實體的隊列與票價算法?

回答

0

您可以使用Project reactorRxJava按組分組傳入消息,並按順序處理每個組中的事件。

隨着工程反應堆你可以的代碼如下所示:

Scheduler groupScheduler = Schedulers.newParallel("groupByPool", 16); 
    Flux.fromStream(incomingMessages()) // stream of new data from socket 
      .groupBy(Message::getEntityId) // split incoming messages by groups, which should be processed serially 
      .map(g -> g.publishOn(groupScheduler)) //create new publisher for groups of messages 
      .subscribe(//create consumer for main stream 
        stream -> 
          stream.subscribe(this::processMessage) // create consumer for group stream and process messagaes 
      ); 
+0

在您的示例代碼中,您正在爲每個組創建新的並行線程池,並將其指向爲我的問題中的一個錯誤解決方案。使用單個線程池處理所有帶有票價算法的任務是我的目標 – Mojtabye

+0

@Mojtabye同意,對不起這個。更新了答案 –

2

您已經描述了使用消息驅動體系結構解決的完美問題。

Spring Integration是爲您提供此功能的模塊。

您可以構建您的任務服務並使用@ServiceActivator進行註釋,並使用Channels創建您的鏈。

通道可以選擇在不同的線程池上執行,並且可以通過您的通道上的隊列設置來克服由於尖峯負載而導致的瓶頸。

絕對值得一試的查看Spring Integration的文檔。

+0

請提高通過添加源代碼的答案。 – Mojtabye