我需要同時處理某個Collection實例中的元素。 換句話說,而不是一個迭代Collection實例如何在Java中同時處理Collection中的元素
for (Someclass elem : coll){
process(elem);
}
我想同時處理這些元素。說,像ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)
。另外,許多併發線程應該被修復。
任何靈活的模式已經存在?
我需要同時處理某個Collection實例中的元素。 換句話說,而不是一個迭代Collection實例如何在Java中同時處理Collection中的元素
for (Someclass elem : coll){
process(elem);
}
我想同時處理這些元素。說,像ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads)
。另外,許多併發線程應該被修復。
任何靈活的模式已經存在?
在一個名爲MyRunnable的類中創建一個run()方法,實現Runnable,其構造函數以elem作爲輸入並將其作爲實例變量存儲。然後使用:
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (Someclass elem : coll){
Runnable worker = new MyRunnable(elem);
executor.execute(worker);
}
我不知道這方面的任何模式,但作爲一個理念,你可以devide上的線程數您的收藏元素,讓每個線程X元素來處理,例如:
收藏有20個元素,youc所有的功能提供了4個線程,然後實習生開始他們想:
thread1 gets the elements [0 .. 4]
thread2 gets the elements [5 .. 9]
thread3 gets the elements [10 .. 14]
thread1 gets the elements [15 .. 19]
注意,從集合刪除元素可能會導致問題,那麼特殊螺紋4名試圖訪問元素[19],而有c中少於20個元素ollection。
EDIT:
如腦提到取決於元素的處理時間,這種想法也可以是不effecient彷彿處理第一5個元件中的一個用了10秒,但在其它元件只用了0.5秒然後thread1會很忙,但其他線程最終不會並行運行很長時間。
我認爲這比其他兩個解決方案弱,因爲處理不同的元素可能需要不同的時間長度。例如。如果處理前5個元素中的一個需要10秒鐘,但其他元素只需要0.5秒,那麼thread1將很忙,但其他線程最終不會並行運行很長時間。 – brain
@brain這是一個很好的觀點,我會寫在答案中,謝謝。 – CloudyMarble
一個很好的解決辦法是:
Runnable
小號給他們ArrayBlockingQueue作爲參數run
方法:當隊列中存在元素時,輪詢並處理它們Runnable
S到ExecutorService
代碼:
BlockingQueue<Someclass> toProcess =
new ArrayBlockingQueue<Someclass>(coll.size(), false, coll);
ExecutorService es = Executors.newFixedThreadPool(numberOfThreads);
for(int count = 0 ; count < numberOfThreads ; ++c) {
es.submit(new MyRunnable(toProcess));
}
private static class MyRunnable() implements Runnable {
private final BlockingQueue<Someclass> toProcess;
public MyRunnable(BlockingQueue<Someclass> toProcess) {
this.toProcess = toProcess;
}
@Override
public void run() {
Someclass element = null;
while((element = toProcess.poll()) != null) {
process(element);
}
}
}
我認爲這相當於斯坦尼亞人的回答。它只是使用executorservice作爲任務隊列,而不是創建一個明確的隊列。如果元素需要添加到隊列中,則更好。 – brain
與run()內即將被內聯的進程()相比,直接調用進程()不會有很大的性能差異。當有足夠的內核或更糟糕的套接字時,縮放會受到隊列實現的限制。所以你想要確保每個隊列的輪詢或採取值得大約100或更多μs的工作。 Vakimshaars示例只有在開始線程之前所有工作都在那裏纔會更好。就像這裏一樣。 –
下面這樣執行類的 「手工製作」 的版本。請注意,您不得不通過Callable
(或Runnable
)的實例,但此類處理器類的類名。
public class ConcurrentCollectionExecutor<T> {
private Collection<T> collection;
private Class<Runnable> processor;
private int numberOfThreads;
private Executor executor;
public ConcurrentCollectionExecutor(Collection<T> collection, Class<Runnable> processor, int numberOfThreads) {
this.collection = collection;
this.processor = processor;
this.numberOfThreads = numberOfThreads;
this.executor = Executors.newFixedThreadPool(numberOfThreads);
}
public void run() {
try {
Constructor<Runnable> constructor = null;
for (T t : collection) {
if (constructor == null) {
constructor = processor.getConstructor(t.getClass());
}
executor.execute(constructor.newInstance(t));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
ey @Andremoniy,我研究了你的代碼,我發現它非常有趣......但是我想添加一些東西來擦亮它有點....如果你允許我,請... 1)可運行的實現必須具有構造函數,該構造函數將與數組的組件類型相同類型的元素作爲唯一參數。 2)你應該添加和運行結束'executor.shutdow()'3)實例字段numberOfThreads沒有被使用。希望能貢獻和問候! – Victor
將它們添加到隊列中,創建固定的線程池執行程序,從隊列中取出進程並放到其他某個隊列中。 –
在C#中它將是一個單行:「並行」。ForEach(coll,process);' –