2012-11-20 282 views
0

我創建了一些工作流如何等待我創建的所有線程。這個例子在99%的情況下工作,但有時候方法waitForAllDone會盡快完成,然後所有線程都完成。我知道這是因爲waitForAllDone後,我將結束其使用創建的線程流這樣的話會出現異常如何等待所有線程完成

Caused by: java.io.IOException: Stream closed 

我的線程開始:

@Override 
    public void run() { 
    try { 
     process(); 
    } finally { 
     Factory.close(this); 
    } 
    } 

收盤:

protected static void close(final Client client) { 
    clientCount--; 
    } 

當我創建線我稱之爲:

public RobWSClient getClient() { 
    clientCount++; 
    return new Client(); 
    } 

和廠內clientCount變量:

private static volatile int clientCount = 0; 

等待:

public void waitForAllDone() { 
    try { 
     while (clientCount > 0) { 
     Thread.sleep(10); 
     } 

    } catch (InterruptedException e) { 
     LOG.error("Error", e); 
    } 
    } 
+0

查看java中的信號量文檔。該示例幾乎相同,您需要:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html –

回答

5

你需要通過​​保障clientCount修改和閱讀。主要問題是clientCount--clientCount++不是原子操作,因此兩個線程可能執行clientCount--/clientCount++並最終得到錯誤的結果。

只要使用volatile,如上所述,只有在該字段上的所有操作都是原子的情況下才會工作。由於它們不是,你需要使用一些鎖定機制。正如安東所說,AtomicInteger是一個很好的選擇。請注意,它應該是finalvolatile以確保它不是線程本地的。

也就是說,Java 1.5的一般規則是使用ExecutorService而不是Threads。與番石榴的Futures類可以讓等待所有一起選擇使用這個完成是簡單的:

Future<List<?>> future = Futures.successfulAsList(myFutureList); 
future.get(); 
// all processes are complete 

Futures.successfulAsList

+0

但我只有一個線程正在創建其他線程。那麼還有誰能夠增加這個價值呢? – hudi

+0

只要您採取了適當的保護措施,就可以從任意數量的線程增加值。 –

+0

好吧,有沒有選擇如何測試它? – hudi

3

我不知道你的你的代碼的其餘部分沒有問題,但你不能像這樣增加volatile變量 - clientCount++;使用AtomicInteger而不是

+0

爲什麼我不能增加它。你能發表一些文件嗎? – hudi

+0

導致它不是原子操作。在A = A + 1的A值可能已被另一個線程更改 – Anton

+4

@ hudi考慮從亞馬遜訂購http://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601 - 幾乎不可能用試錯法編寫一個線程安全的代碼 –

1

等待線程終止的最好方法是使用其中一個高級併發設施。 在這種情況下,最簡單的方法是使用ExecutorService。

你會「報價」的新任務,以這種方式執行:

... 
ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE); 
... 

Client client = getClient(); //assuming Client implements runnable 
executor.submit(client); 
... 

public void waitForAllDone() { 
    executor.awaitTermination(30, TimeUnit.SECOND) ; wait termination of all threads for 30 secs 
... 
} 

這樣一來,你就不會浪費在忙等待或睡眠/清醒週期寶貴的CPU週期。 有關詳細信息,請參閱ExecutorService文檔。