2012-07-25 46 views
2

我有一個任務隊列和一個線程,在幾秒鐘內窺探一次隊列,並且它有一個任務執行它。等待未知數量的異步任務

我有另一個代碼段(當然是另一個線程),它在循環中創建任務(我無法預先從循環外部知道任務數量)並將它們插入到隊列中。任務包含一些「結果」對象,外部線程(創建這些任務)需要等待所有任務完成並最終從每個任務中獲取結果。 問題是我無法將java Semaphore \ CountDownLatch等傳遞給結果對象,因爲我不知道提前顯示器的數量。 我也不能使用使用invokeAll的Executor,或者等待Future對象,因爲任務是不同步的(外部線程只是將任務加入到隊列中,而另一個線程在他有空時執行任務)。

我唯一想到的解決方案是創建一些「反轉信號量」類,它包含一組結果和一個監視器計數器。該功能的getResult將檢查計數器== 0,如果答案是肯定就會通知一些鎖定的對象,以及功能的getResult會等待這個鎖:

public class InvertedSemaphore<T> { 
    Set<T> resultSet; 
    int usages; 
    final Object c; 

    public InvertedSemaphore() { 
     resultSet = Collections.synchronizedSet(new HashSet<T>()); 
     usages = 0; 
     c = new Object(); 
    } 

    public void addResult(T result) { 
     resultSet.add(result); 
    } 

    public void addResults(Set<T> result) { 
     resultSet.addAll(result); 
    } 


    public void acquire() { 
     usages++; 
    } 

    public void release() { 
     synchronized (c) { 
      if (--usages == 0) { 
       c.notify(); 
      } 
     } 
    } 

    public Set<T> getResults() { 
     synchronized (c) { 
      try { 
       while (usages > 0) { 
        c.wait(); 
       } 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     return resultSet; 
    } 

} 

每個addTask方法調用semaphore.acquire,和每個(未同步)任務將在任務結束時調用semaphore.release。

這聽起來很複雜,我很確定在java併發庫中有更好的解決方案。

任何想法將appriciated :)

+0

聽起來像你需要'CountUpLatch'。:) – corsiKa 2012-07-25 17:17:53

回答

3

如果任務不需要爲了進行處理,使用ExecutorCompletionService

更一般地說,沒有必要爲了上ExecutorService使用invokeAll得到Future的結果。 ExecutorService#submit可以用於此目的,或者可選地,創建的任務本身可以實現Future,從而允許任務的創建者在稍後的時間點詢問結果。

一些代碼:

class MyTask { 
    AtomicReference<?> result = new AtomicReference<?>(); 

    void run() { 
     //do stuff here 
     result.set(/* the result of the calculation */); 
    } 

    boolean resultReady() { 
     return result.get()!=null; 
    } 

    ? get() { 
     return result.get(); 
    } 
} 

...別處代碼

void createTasks() { 
    Collection<MyTask> c = new ...; 

    while(indeterminable condition) { 
     MyTask task = new MyTask(); 
     c.add(task); 
     mysteryQueue.add(task); 
    } 

    while(haven't received all results) { 
     MyTask task = c.get(...); //or iterate or whatever 
     ? result = task.get(); 
     if (result!=null) { 
      //do stuff, probably remove the task from the collection c would be smart 
     } 
    } 
} 
+0

但是執行程序的問題在於執行程序將執行的'任務'只是將實際任務放入某個隊列中 - 並且這些任務將以不同的方式以不同的方式執行。我需要等待'真正'的任務結果,所以如果結果已經存在,我確實可以檢查一次,但我甚至不知道是否會有結果。我必須知道何時完成所有異步任務,然後才能獲得結果。 – axelrod 2012-07-25 17:27:39

+0

那真是愚蠢。但是它並不妨礙你自己實現'Future'或者通過「真實任務」的界面提供某種方式來獲得結果 – 2012-07-25 17:38:42

1

一個想法是使用一個單獨的隊列結果。
所以你將有一個阻塞隊列,線程A爲線程B放置任務,從而具有生產者 - 消費者的方法,並且當每個任務完成時,結果可以被放置在的第二個結果隊列中,將消費者 - 生產者角色因爲現在線程A最初創建的任務將消耗第二個隊列的結果。

+0

是的,但是我需要對結果進行一些操作,我不會知道當結果隊列將被視爲「完整」時。 – axelrod 2012-07-25 17:40:52

+0

不確定你的意思。線程'A'知道它已經在線程'B'的隊列中插入了多少個任務。現在只要在結果隊列中有一個任務消耗它,就會線程'A'。如果沒有任務並且還沒有消耗完所有結果(它知道它放在第一個隊列上多少,所以'A'知道應該消耗多少),它要麼等待結果要麼進行其他處理 – Cratylus 2012-07-25 17:57:38

+0

@axelrod,是的,你可以,就像你在反向信號燈的想法中檢查櫃檯一樣。有些東西會在那裏放置一個初始值並且東西會遞減 - 這意味着您知道如何確定您可以根據完成一定數量的任務繼續執行其他一些執行。 – Vitaliy 2012-07-25 17:58:47

1

您可以執行以下操作: 每個生產者都將擁有自己的隊列。生產者將通過一種手段將此隊列報告給任務本身。當任務完成運行時,它將把結果排隊到這個隊列中。它是由一些代碼描述的獸:

class Result{} 

interface IResultCallback{ 
    void resultReady(Result r); // this is an abstraction of the queue 
} 

class Producer implements IResultCallback{ 
    // the producer needs to pass itself to the constructor of the task, 
    // the task will only see its "resultReady" facade and will be able to report to it. 
    // the producer can aggragte the results at it will and execute its own computation as 
     // as soon it is ready 

    Queue<Result> results; // = init queue 

    @Override 
    public void resultReady(Result r) { 
     results.add(r); 

     if(results.size() == 9){ 
      operate(); 
     } 
     results.clear(); 
    } 

    public void operate(){ 
     // bla bla 
    } 
} 

public class Task { 
    IResultCallback callback; 

    public Task(IResultCallback callback){ 
     this.callback = callback; 
    } 
    public void execute(){ 
     // bla bla 

     Result r = null; // init result; 
     callback.resultReady(r); 
    } 
} 
+0

我會試試這個方法,thx :) – axelrod 2012-07-25 18:41:09

+0

@axelrod祝你好運!別忘了成爲StackOverflow的優秀代表,並投票/接受答案:-) – Vitaliy 2012-07-25 18:49:08

+0

@axelrod歡迎登陸!它*是一個很好的問題! – Vitaliy 2012-07-25 18:55:49