2013-02-08 20 views
16

我需要同時處理某個Collection實例中的元素。 換句話說,而不是一個迭代Collection實例如何在Java中同時處理Collection中的元素

for (Someclass elem : coll){ 
    process(elem); 
} 

我想同時處理這些元素。說,像ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)。另外,許多併發線程應該被修復。

任何靈活的模式已經存在?

+1

將它們添加到隊列中,創建固定的線程池執行程序,從隊列中取出進程並放到其他某個隊列中。 –

+0

在C#中它將是一個單行:「並行」。ForEach(coll,process);' –

回答

8

在一個名爲MyRunnable的類中創建一個run()方法,實現Runnable,其構造函數以elem作爲輸入並將其作爲實例變量存儲。然後使用:

ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); 
for (Someclass elem : coll){ 
    Runnable worker = new MyRunnable(elem); 
    executor.execute(worker); 
} 
0

我不知道這方面的任何模式,但作爲一個理念,你可以devide上的線程數您的收藏元素,讓每個線程X元素來處理,例如:

收藏有20個元素,youc所有的功能提供了4個線程,然後實習生開始他們想:

thread1 gets the elements [0 .. 4] 
thread2 gets the elements [5 .. 9] 
thread3 gets the elements [10 .. 14] 
thread1 gets the elements [15 .. 19] 

注意,從集合刪除元素可能會導致問題,那麼特殊螺紋4名試圖訪問元素[19],而有c中少於20個元素ollection。

EDIT

如腦提到取決於元素的處理時間,這種想法也可以是不effecient彷彿處理第一5個元件中的一個用了10秒,但在其它元件只用了0.5秒然後thread1會很忙,但其他線程最終不會並行運行很長時間。

+0

我認爲這比其他兩個解決方案弱,因爲處理不同的元素可能需要不同的時間長度。例如。如果處理前5個元素中的一個需要10秒鐘,但其他元素只需要0.5秒,那麼thread1將很忙,但其他線程最終不會並行運行很長時間。 – brain

+0

@brain這是一個很好的觀點,我會寫在答案中,謝謝。 – CloudyMarble

8

一個很好的解決辦法是:

  1. 實例含有的元素來處理
  2. 一個ArrayBlockingQueue實例化一個ExecutorService以執行處理同時
  3. 實例化Runnable小號給他們ArrayBlockingQueue作爲參數
  4. 執行run方法:當隊列中存在元素時,輪詢並處理它們
  5. 提交您的Runnable S到ExecutorService

代碼:

BlockingQueue<Someclass> toProcess = 
    new ArrayBlockingQueue<Someclass>(coll.size(), false, coll); 
ExecutorService es = Executors.newFixedThreadPool(numberOfThreads); 
for(int count = 0 ; count < numberOfThreads ; ++c) { 
    es.submit(new MyRunnable(toProcess)); 
} 


private static class MyRunnable() implements Runnable { 
    private final BlockingQueue<Someclass> toProcess; 

    public MyRunnable(BlockingQueue<Someclass> toProcess) { 
     this.toProcess = toProcess; 
    } 

    @Override 
    public void run() { 
     Someclass element = null; 
     while((element = toProcess.poll()) != null) { 
      process(element); 
     } 
    } 
} 
+0

我認爲這相當於斯坦尼亞人的回答。它只是使用executorservice作爲任務隊列,而不是創建一個明確的隊列。如果元素需要添加到隊列中,則更好。 – brain

+0

與run()內即將被內聯的進程()相比,直接調用進程()不會有很大的性能差異。當有足夠的內核或更糟糕的套接字時,縮放會受到隊列實現的限制。所以你想要確保每個隊列的輪詢或採取值得大約100或更多μs的工作。 Vakimshaars示例只有在開始線程之前所有工作都在那裏纔會更好。就像這裏一樣。 –

2

下面這樣執行類的 「手工製作」 的版本。請注意,您不得不通過Callable(或Runnable)的實例,但此類處理器類的類名。

public class ConcurrentCollectionExecutor<T> { 

private Collection<T> collection; 
private Class<Runnable> processor; 
private int numberOfThreads; 
private Executor executor; 

public ConcurrentCollectionExecutor(Collection<T> collection, Class<Runnable> processor, int numberOfThreads) { 
    this.collection = collection; 
    this.processor = processor; 
    this.numberOfThreads = numberOfThreads; 
    this.executor = Executors.newFixedThreadPool(numberOfThreads); 
} 

public void run() { 
    try { 
     Constructor<Runnable> constructor = null; 
     for (T t : collection) { 
      if (constructor == null) { 
       constructor = processor.getConstructor(t.getClass()); 
      } 
      executor.execute(constructor.newInstance(t)); 
     } 
    } catch (Exception e) { 
     throw new RuntimeException(e); 
    } 
}  
} 
+1

ey @Andremoniy,我研究了你的代碼,我發現它非常有趣......但是我想添加一些東西來擦亮它有點....如果你允許我,請... 1)可運行的實現必須具有構造函數,該構造函數將與數組的組件類型相同類型的元素作爲唯一參數。 2)你應該添加和運行結束'executor.shutdow()'3)實例字段numberOfThreads沒有被使用。希望能貢獻和問候! – Victor

相關問題