2010-12-04 29 views
4

歸結爲一個線程通過某種服務提交作業。作業在一些TPExecutor中執行。之後,此服務會在特定條件下檢查結果並在原始線程中拋出異常(作業超過最大重試次數等)。下面的代碼片段大致說明了此方案中的遺留代碼:如何使用CountDownLatch正確同步/鎖定

import java.util.concurrent.CountDownLatch; 

public class IncorrectLockingExample { 

private static class Request { 

    private final CountDownLatch latch = new CountDownLatch(1); 

    private Throwable throwable; 

    public void await() { 
     try { 
      latch.await(); 
     } catch (InterruptedException ignoredForDemoPurposes) { 
     } 
    } 

    public void countDown() { 
     latch.countDown(); 
    } 

    public Throwable getThrowable() { 
     return throwable; 
    } 

    public void setThrowable(Throwable throwable) { 
     this.throwable = throwable; 
    } 

} 

private static final Request wrapper = new Request(); 

public static void main(String[] args) throws InterruptedException { 

    final Thread blockedThread = new Thread() { 
     public void run() { 
      wrapper.await(); 
      synchronized (wrapper) { 
       if (wrapper.getThrowable() != null) 
        throw new RuntimeException(wrapper.getThrowable()); 
      } 
     } 
    }; 

    final Thread workingThread = new Thread() { 
     public void run() { 
      wrapper.setThrowable(new RuntimeException()); 
      wrapper.countDown(); 

     } 
    }; 

    blockedThread.start(); 
    workingThread.start(); 

    blockedThread.join(); 
    workingThread.join(); 
} 

}

有時,(在我的箱子不可複製的,但發生在16核服務器箱)異常沒有得到報告給原來的線程。我認爲這是因爲發生 - 之前不是強迫的(例如'countDown'發生在'setThrowable'之前)並且程序繼續工作(但應該失敗)。 我將不勝感激關於如何解決這種情況的任何幫助。 約束條件是:一週內發佈,需要對現有代碼庫的影響最小。

+0

250 KLOC項目在這裏完全是多線程的,工作在16核心等。我們使用像「CountDownLatch」*「lot **」這樣的「高級」多線程工具。我們使用低級別事物的次數,例如* Object *的wait *方法和* Thread *的join *方法? **零**。在我看來,現在在默認API中有足夠多的高級併發功能,您不需要重新創建基於Java idiosynchrasies的任何碎輪。 +1給Peter Lawrey的答案。 – SyntaxT3rr0r 2010-12-04 13:18:36

+0

@ Webinator:OP *在此處使用「高級」CountDownLatch工具來實現其設計目的之一。 – willjcroz 2010-12-04 13:38:38

+0

您確定上面的代碼不符合預期嗎?我認爲沒有理由不糾正之後。 – willjcroz 2010-12-04 13:46:04

回答

6

上面的代碼(現在已更新)應該按照您的預期工作,而不使用進一步的同步機制。通過使用CountDownLatchawait()countdown()方法來實施存儲屏障及其相應的'發生在'之前的關係。

API docs:一個成功的「獲取」方法,例如隨後的

操作之前爲「釋放」同步器的方法,如Lock.unlock,Semaphore.release,和CountDownLatch.countDown發生-前行動如另一個線程中的同一個同步器對象上的Lock.lock,Semaphore.acquire,Condition.await和CountDownLatch.await。

如果您定期處理併發讓自己的'Java Concurrency in Practice'副本,它是Java併發聖經,將是值得它的重量你的書架上:-)。

2

我懷疑你需要

private volatile Throwable throwable 

您是否嘗試過使用,因爲它是內置的,這是否給你一個ExecutorService。下面的打印

future1 := result 
future2 threw java.lang.IllegalStateException 
future3 timed out 

的代碼是

public static void main(String... args) { 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 
    Future<String> future1 = executor.submit(new Callable<String>() { 
     public String call() throws Exception { 
      return "result"; 
     } 
    }); 

    Future<String> future2 = executor.submit(new Callable<String>() { 
     public String call() throws Exception { 
      throw new IllegalStateException(); 
     } 
    }); 

    Future<String> future3 = executor.submit(new Callable<String>() { 
     public String call() throws Exception { 
      Thread.sleep(2000); 
      throw new AssertionError(); 
     } 
    }); 

    printResult("future1", future1); 
    printResult("future2", future2); 
    printResult("future3", future3); 
    executor.shutdown(); 
} 

private static void printResult(String description, Future<String> future) { 
    try { 
     System.out.println(description+" := "+future.get(1, TimeUnit.SECONDS)); 
    } catch (InterruptedException e) { 
     System.out.println(description+" interrupted"); 
    } catch (ExecutionException e) { 
     System.out.println(description+" threw "+e.getCause()); 
    } catch (TimeoutException e) { 
     System.out.println(description+" timed out"); 
    } 
} 

在爲FutureTask的代碼,有註釋。

/** 
* The thread running task. When nulled after set/cancel, this 
* indicates that the results are accessible. Must be 
* volatile, to ensure visibility upon completion. 
*/ 

如果您不打算重新使用在JDK的代碼,它仍然是值得一讀,這樣你可以在他們使用任何技巧回升。