2014-11-05 80 views
0

使用Camel拆分ArrayList並且並行處理每個項目最多10個線程。 以下是配置。 線程池配置文件設置爲最大線程數= 10。Camel Splitter並行處理陣列列表 - 併發訪問問題

<camel:route id="ReportsRoute"> 
     <camel:from uri="direct:processReportsChannel" /> 
     <camel:to uri="bean:reportRepository?method=getPendingTransactions" /> 
     <camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile"> 
      <camel:simple>${body}</camel:simple> 
      <camel:doTry> 
       <camel:to uri="direct:processReportChannel" /> 
       <camel:doCatch> 
        <camel:exception>java.lang.Exception</camel:exception> 
        <camel:handled> 
         <camel:constant>true</camel:constant> 
        </camel:handled>       
        <camel:to uri="bean:ReportRepository?method=markAsFailed"/> 
        <camel:wireTap uri="direct:loggingAndNotificationChannel" /> 
       </camel:doCatch> 
      </camel:doTry> 
     </camel:split> 
    </camel:route> 

bean:reportRepository?method=getPendingTransactions得到ArrayList和傳遞到分配器。

processReportChannel是處理項目的處理器。

問題: 它在作業開始時啓動10個線程,但某些線程正在拾取相同的項目。例如,如果我在ArrayList,thread_no_1和thread_no_2中有item_no_1到10,或者有更多線程正在接收,那麼讓我們說item_no_2。是否因爲Array List不是線程安全的,Splitter不管理它?

我不是這方面的專家,需要幫助指出問題所在。

回答

0

我用下面的(簡單)安裝測試:

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
    <route id="ReportsRoute"> 
     <from uri="direct:start" /> 
     <!-- By default a pool size of 10 is used. --> 
     <split parallelProcessing="true"> 
      <simple>${body}</simple> 
      <to uri="direct:sub" /> 
     </split> 
    </route> 
    <route> 
     <from uri="direct:sub"/> 
     <log message="Processing item ${body}" /> 
    </route> 
</camelContext> 

測試:

List<Object> list = new ArrayList<>(); 
for (int i = 0; i < 1000; i++) { 
    list.add("And we go and go: " + (i + 1)); 
} 
template.sendBody("direct:start", list); 

在此設置的條目被處理兩次。所以在你的處理器中必須有一些導致這個問題的東西,即同一個列表項被多個線程拾取。

+0

有沒有什麼辦法可以在進行並行處理之前進行聚合? – 2017-02-23 09:14:12