2010-01-28 32 views
25

我有一個將異步任務委託給線程池的進程。我需要確保某些任務按順序執行。 因此,例如使用ExecutorService控制任務執行順序

任務按順序到達

任務A1,B1,C1,D1,E1,A2,A3,B2,F1

任務可以以任何順序,除非有一個被執行自然依賴,所以a1,a2,a3必須按照這個順序處理,或者分配給同一個線程,或者阻塞這些直到我知道前一個#任務完成。

目前它不使用Java Concurrency軟件包,但我正在考慮更改以利用線程管理。

有誰有類似的解決方案或如何實現這一

回答

2

建議當您提交RunnableCallableExecutorService回報你收到Future。讓依賴於a1的線程通過a1的Future並致電Future.get()。這將阻塞,直到線程完成。

所以:

ExecutorService exec = Executor.newFixedThreadPool(5); 
Runnable a1 = ... 
final Future f1 = exec.submit(a1); 
Runnable a2 = new Runnable() { 
    @Override 
    public void run() { 
    f1.get(); 
    ... // do stuff 
    } 
} 
exec.submit(a2); 

等。

+4

經過實施,我不認爲這將有一個固定的線程池工作,因爲線程可能()'一次和'f1.get所有塊陷入僵局。 – finnw 2010-01-28 10:37:02

+0

根據需要調整池的大小。 – cletus 2010-01-28 10:59:48

+0

或使用緩存的線程池。 – finnw 2010-01-28 11:10:35

2

另一種選擇是創建自己的執行程序,將其稱爲OrderedExecutor,並創建一個封裝的ThreadPoolExecutor對象數組,每個內部執行程序有1個線程。然後您提供一個機制,選擇內部對象之一,例如,你可以通過提供您的類的用戶可以實現一個接口,這樣做:

 
executor = new OrderedExecutor(10 /* pool size */, new OrderedExecutor.Chooser() { 
    public int choose(Runnable runnable) { 
    MyRunnable myRunnable = (MyRunnable)runnable; 
    return myRunnable.someId(); 
    }); 

executor.execute(new MyRunnable()); 

OrderedExecutor.execute(的實現),那麼將使用選擇器得到一個int,你用這個池大小來修改它,這就是你的索引到內部數組中。這個想法是「someId()」將爲所有「a」返回相同的值,等等。

12

當我在過去完成這項工作時,我通常已經通過一個組件處理了順序,然後提交可執行文件/可運行到Executor。

有點像。

  • 得到的任務列表運行,部分依賴
  • 創建執行人及與ExecutorCompletionService
  • 包裹搜尋所有任務,任何沒有依賴關係,通過完成服務
  • 投票安排他們完成服務
  • 由於每個任務完成
    • 將它添加到「已完成」列表
    • 重新評估與「完成列表」相關的任何等待任務,看他們是否「完全依賴」。如果直到所有的任務都提交,以便安排他們
    • 沖洗重複/完成

完成服務是能夠得到的任務,因爲他們完成,而不是試圖輪詢一堆的好方法期貨。但是,您可能希望保留一個Map<Future, TaskIdentifier>,當通過完成服務安排任務時填充這個Map<Future, TaskIdentifier>,以便當完成服務爲您提供完整的Future時,您可以確定它是哪個TaskIdentifier

如果你發現自己處於一個任務仍在等待運行的狀態,但沒有任何事情正在運行,並且沒有任何事情可以安排,那麼你就有循環依賴問題。

9

我編寫自己的執行程序,以確保任務具有相同密鑰的任務排序。它使用具有相同密鑰的訂單任務的隊列映射。每個鍵控任務使用相同的鍵執行下一個任務。

此解決方案不處理RejectedExecutionException或委託Executor的其他異常!所以委託Executor應該是「無限的」。

import java.util.HashMap; 
import java.util.LinkedList; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.Executor; 

/** 
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). 
*/ 
public class OrderingExecutor implements Executor{ 

    private final Executor delegate; 
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>(); 

    public OrderingExecutor(Executor delegate){ 
     this.delegate = delegate; 
    } 

    @Override 
    public void execute(Runnable task) { 
     // task without key can be executed immediately 
     delegate.execute(task); 
    } 

    public void execute(Runnable task, Object key) { 
     if (key == null){ // if key is null, execute without ordering 
      execute(task); 
      return; 
     } 

     boolean first; 
     Runnable wrappedTask; 
     synchronized (keyedTasks){ 
      Queue<Runnable> dependencyQueue = keyedTasks.get(key); 
      first = (dependencyQueue == null); 
      if (dependencyQueue == null){ 
       dependencyQueue = new LinkedList<Runnable>(); 
       keyedTasks.put(key, dependencyQueue); 
      } 

      wrappedTask = wrap(task, dependencyQueue, key); 
      if (!first) 
       dependencyQueue.add(wrappedTask); 
     } 

     // execute method can block, call it outside synchronize block 
     if (first) 
      delegate.execute(wrappedTask); 

    } 

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
     return new OrderedTask(task, dependencyQueue, key); 
    } 

    class OrderedTask implements Runnable{ 

     private final Queue<Runnable> dependencyQueue; 
     private final Runnable task; 
     private final Object key; 

     public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
      this.task = task; 
      this.dependencyQueue = dependencyQueue; 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      try{ 
       task.run(); 
      } finally { 
       Runnable nextTask = null; 
       synchronized (keyedTasks){ 
        if (dependencyQueue.isEmpty()){ 
         keyedTasks.remove(key); 
        }else{ 
         nextTask = dependencyQueue.poll(); 
        } 
       } 
       if (nextTask!=null) 
        delegate.execute(nextTask); 
      } 
     } 
    } 
} 
+0

+1。感謝那。我會使用這種植入方式,但我真的不知道這是不是標記爲問題的最終答案。 – 2014-05-26 13:44:50

0

Habanero-Java library,存在可用於表達任務之間的依賴關係,並避免線程阻塞操作數據驅動任務的概念。在封面下,Habanero-Java庫使用JDK的ForkJoinPool(即ExecutorService)。

例如,你的用例的任務A1,A2,A3,...可以表述如下:

HjFuture a1 = future(() -> { doA1(); return true; }); 
HjFuture a2 = futureAwait(a1,() -> { doA2(); return true; }); 
HjFuture a3 = futureAwait(a2,() -> { doA3(); return true; }); 

需要注意的是A1,A2和A3是類型HjFuture的對象只是引用並且可以在您的自定義數據結構中進行維護,以指定在運行時任務A2和A3進入時的依賴關係。

有一些tutorial slides available。 你可以找到更多的文件,如javadoc,API summaryprimers

0

您可以使用Executors.newSingleThreadExecutor(),但它將只使用一個線程來執行您的任務。另一個選擇是使用CountDownLatch。這裏有一個簡單的例子:

public class Main2 { 

public static void main(String[] args) throws InterruptedException { 

    final CountDownLatch cdl1 = new CountDownLatch(1); 
    final CountDownLatch cdl2 = new CountDownLatch(1); 
    final CountDownLatch cdl3 = new CountDownLatch(1); 

    List<Runnable> list = new ArrayList<Runnable>(); 
    list.add(new Runnable() { 
     public void run() { 
      System.out.println("Task 1"); 

      // inform that task 1 is finished 
      cdl1.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 1 is finished 
      try { 
       cdl1.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 2"); 

      // inform that task 2 is finished 
      cdl2.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 2 is finished 
      try { 
       cdl2.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 3"); 

      // inform that task 3 is finished 
      cdl3.countDown(); 
     } 
    }); 

    ExecutorService es = Executors.newFixedThreadPool(200); 
    for (int i = 0; i < 3; i++) { 
     es.submit(list.get(i)); 
    } 

    es.shutdown(); 
    es.awaitTermination(1, TimeUnit.MINUTES); 
} 
} 
0

我爲這個問題創建了一個OrderingExecutor。如果將同一個關鍵字傳遞給方法execute()和不同的可執行文件,那麼使用相同關鍵字執行runnables的順序將按照調用execute()的順序執行,並且永遠不會重疊。

import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.Executor; 

/** 
* Special executor which can order the tasks if a common key is given. 
* Runnables submitted with non-null key will guaranteed to run in order for the same key. 
* 
*/ 
public class OrderedExecutor { 

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
      new ConcurrentLinkedQueue<Runnable>()); 

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>(); 
    private Executor delegate; 
    private volatile boolean stopped; 

    public OrderedExecutor(Executor delegate) { 
     this.delegate = delegate; 
    } 

    public void execute(Runnable runnable, Object key) { 
     if (stopped) { 
      return; 
     } 

     if (key == null) { 
      delegate.execute(runnable); 
      return; 
     } 

     Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> { 
      v.add(runnable); 
      return v; 
     }); 
     if (queueForKey == null) { 
      // There was no running task with this key 
      Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>()); 
      newQ.add(runnable); 
      // Use putIfAbsent because this execute() method can be called concurrently as well 
      queueForKey = taskMap.putIfAbsent(key, newQ); 
      if (queueForKey != null) 
       queueForKey.add(runnable); 
      delegate.execute(new InternalRunnable(key)); 
     } 
    } 

    public void shutdown() { 
     stopped = true; 
     taskMap.clear(); 
    } 

    /** 
    * Own Runnable used by OrderedExecutor. 
    * The runnable is associated with a specific key - the Queue&lt;Runnable> for this 
    * key is polled. 
    * If the queue is empty, it tries to remove the queue from taskMap. 
    * 
    */ 
    private class InternalRunnable implements Runnable { 

     private Object key; 

     public InternalRunnable(Object key) { 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      while (true) { 
       // There must be at least one task now 
       Runnable r = taskMap.get(key).poll(); 
       while (r != null) { 
        r.run(); 
        r = taskMap.get(key).poll(); 
       } 
       // The queue emptied 
       // Remove from the map if and only if the queue is really empty 
       boolean removed = taskMap.remove(key, EMPTY_QUEUE); 
       if (removed) { 
        // The queue has been removed from the map, 
        // if a new task arrives with the same key, a new InternalRunnable 
        // will be created 
        break; 
       } // If the queue has not been removed from the map it means that someone put a task into it 
        // so we can safely continue the loop 
      } 
     } 
    } 

    /** 
    * Special Queue implementation, with equals() and hashCode() methods. 
    * By default, Java SE queues use identity equals() and default hashCode() methods. 
    * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). 
    * 
    * @param <E> The type of elements in the queue. 
    */ 
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> { 

     private Queue<E> delegate; 

     public QueueWithHashCodeAndEquals(Queue<E> delegate) { 
      this.delegate = delegate; 
     } 

     public boolean add(E e) { 
      return delegate.add(e); 
     } 

     public boolean offer(E e) { 
      return delegate.offer(e); 
     } 

     public int size() { 
      return delegate.size(); 
     } 

     public boolean isEmpty() { 
      return delegate.isEmpty(); 
     } 

     public boolean contains(Object o) { 
      return delegate.contains(o); 
     } 

     public E remove() { 
      return delegate.remove(); 
     } 

     public E poll() { 
      return delegate.poll(); 
     } 

     public E element() { 
      return delegate.element(); 
     } 

     public Iterator<E> iterator() { 
      return delegate.iterator(); 
     } 

     public E peek() { 
      return delegate.peek(); 
     } 

     public Object[] toArray() { 
      return delegate.toArray(); 
     } 

     public <T> T[] toArray(T[] a) { 
      return delegate.toArray(a); 
     } 

     public boolean remove(Object o) { 
      return delegate.remove(o); 
     } 

     public boolean containsAll(Collection<?> c) { 
      return delegate.containsAll(c); 
     } 

     public boolean addAll(Collection<? extends E> c) { 
      return delegate.addAll(c); 
     } 

     public boolean removeAll(Collection<?> c) { 
      return delegate.removeAll(c); 
     } 

     public boolean retainAll(Collection<?> c) { 
      return delegate.retainAll(c); 
     } 

     public void clear() { 
      delegate.clear(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (!(obj instanceof QueueWithHashCodeAndEquals)) { 
       return false; 
      } 
      QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj; 
      return Arrays.equals(toArray(), other.toArray()); 
     } 

     @Override 
     public int hashCode() { 
      return Arrays.hashCode(toArray()); 
     } 

    } 

}