3

我的目標是按順序發佈異步事件,這些異步事件也按順序到達並且需要任意時間進行處理。所以下面是我目前的實施只使用waitnotifyMyThread處理事件,按id將結果放入哈希表,並在發佈此事件前按順序通知Scheduler線程(如果它已被阻止)。按順序處理異步事件併發布結果

使用java.util.concurrent包實現此功能的方法會更好,更簡潔嗎?

import java.util.Map; 
import java.util.Random; 
import java.util.concurrent.ConcurrentHashMap; 


public class AsyncHandler { 
    private final Map<Integer, Object> locks = new ConcurrentHashMap<Integer, Object>(); 
    private final Map<Integer, Result> results = new ConcurrentHashMap<Integer, Result>(); 
    private static final Random rand = new Random(); 

    public AsyncHandler() { 
     new Scheduler(this, locks, results).start(); 
    } 

    public void handleEvent(Event event) { 
     System.out.println("handleEvent(" + event.id + ")"); 
     new MyThread(this, event, locks, results).start(); 
    } 

    public Result processEvent (Event event) { 
     System.out.println("processEvent(" + event.id + ")"); 
     locks.put(event.id, new Object()); 

     try { 
      Thread.sleep(rand.nextInt(10000)); 
     } catch (InterruptedException e) { 
      System.out.println(e); 
     } 

     return new Result(event.id); 
    } 

    public void postProcessEvent (Result result) { 
     System.out.println(result.id); 
    } 

    public static void main (String[] args) { 
     AsyncHandler async = new AsyncHandler(); 

     for (int i = 0; i < 100; i++) { 
      async.handleEvent(new Event(i)); 
     } 
    } 
} 

class Event { 
    int id; 

    public Event (int id) { 
     this.id = id; 
    } 
} 

class Result { 
    int id; 

    public Result (int id) { 
     this.id = id; 
    } 
} 

class MyThread extends Thread { 
    private final Event event; 
    private final Map<Integer, Object> locks; 
    private final Map<Integer, Result> results; 
    private final AsyncHandler async; 

    public MyThread (AsyncHandler async, Event event, Map<Integer, Object> locks, Map<Integer, Result> results) { 
     this.async = async; 
     this.event = event; 
     this.locks = locks; 
     this.results = results; 
    } 

    @Override 
    public void run() { 
     Result res = async.processEvent(event); 
     results.put(event.id, res); 

     Object lock = locks.get(event.id); 

     synchronized (lock) { 
      lock.notifyAll(); 
     } 
    } 
} 

class Scheduler extends Thread { 
    private int curId = 0; 
    private final AsyncHandler async; 
    private final Map<Integer, Object> locks; 
    private final Map<Integer, Result> results; 

    public Scheduler (AsyncHandler async, Map<Integer, Object> locks, Map<Integer, Result> results) { 
     this.async = async; 
     this.locks = locks; 
     this.results = results; 
    } 

    @Override 
    public void run() { 
     while (true) { 
      Result res = results.get(curId); 
      if (res == null) { 
       Object lock = locks.get(curId); 

       //TODO: eliminate busy waiting 
       if (lock == null) { 
        continue; 
       } 

       synchronized (lock) { 
        try { 
         lock.wait(); 
        } catch (InterruptedException e) { 
         System.out.println(e); 
         System.exit(1); 
        } 
       } 
       res = results.get(curId); 
      } 

      async.postProcessEvent(res); 
      results.remove(curId); 
      locks.remove(curId); 
      curId++; 
     } 
    } 
} 
+0

您可以使用[的ConcurrentLinkedQueue(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html)來處理傳入事件的cuncurency和秩序,啓動結果[Future](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Future.html)並將它們放入隊列中的線程 –

回答

2

是的併發庫會簡單得多。

ExecutorService旨在包裝一個線程池和一個隊列,爲每個任務返回一個Future並提供等待結果的任何線程。

如果要按順序處理結果,請按順序處理未來結果。

要處理異步結果,以便你可以做

public class Main { 
    public static void main(String[] args) { 
     Main main = new Main(); 
     for (int i = 0; i < 1000; i++) { 
      final int finalI = i; 
      main.submitTask(new Callable<Long>() { 
       @Override 
       public Long call() throws Exception { 
        long millis = (long) (Math.pow(2000, Math.random())); 
        Thread.sleep(millis); 
        return millis; 
       } 
      }, new ResultHandler<Long>() { 
       @Override 
       public void onFuture(Future<Long> future) throws ExecutionException, InterruptedException { 
        System.out.println(new Date() + ": " + finalI + " - Slept for " + future.get() + " millis"); 
       } 
      }); 
     } 
     main.shutdown(); 
    } 


    public interface ResultHandler<T> { 
     void onFuture(Future<T> future) throws Exception; 
    } 

    private final ExecutorService pool = Executors.newFixedThreadPool(10); 
    private final ExecutorService result = Executors.newSingleThreadExecutor(); 

    public synchronized <T> void submitTask(Callable<T> callable, final ResultHandler<T> resultHandler) { 
     final Future<T> future = pool.submit(callable); 
     result.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        resultHandler.onFuture(future); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 

    public void shutdown() { 
     pool.shutdown(); 
     result.shutdown(); 
    } 
} 

打印

Wed Oct 02 16:32:07 CEST 2013: 0 - Slept for 1 millis 
Wed Oct 02 16:32:07 CEST 2013: 1 - Slept for 1899 millis 
Wed Oct 02 16:32:09 CEST 2013: 2 - Slept for 32 millis 
Wed Oct 02 16:32:09 CEST 2013: 3 - Slept for 32 millis 
Wed Oct 02 16:32:09 CEST 2013: 4 - Slept for 214 millis 
Wed Oct 02 16:32:09 CEST 2013: 5 - Slept for 366 millis 
... many deleted ... 
Wed Oct 02 16:32:09 CEST 2013: 82 - Slept for 6 millis 
Wed Oct 02 16:32:09 CEST 2013: 83 - Slept for 1636 millis 
Wed Oct 02 16:32:10 CEST 2013: 84 - Slept for 44 millis 
Wed Oct 02 16:32:10 CEST 2013: 85 - Slept for 1 millis 

你可以看到,雖然一些任務需要比別人更長的時間,輸出的順序是任務是順序添加。您也可以看到它正在同一秒處理許多任務(同時)

+0

什麼是最好的數據結構實現保持結果?它應該是某種優先級隊列嗎?仍然不知道如何執行發佈命令? –

+0

@NikolayKuznetsov查看我給出的更新示例。 –

+1

非常感謝!所以爲了實現發佈順序提交給'result'池,然後調用阻止調用的'Future.get',對吧? –

1

或者,您可以將期貨排隊,而不是將後處理安排到單個線程執行程序。邏輯非常相似;單線程執行程序也在內部使用隊列,但主要區別在於處理結果對象的方式。使用隊列允許最終處理階段的循環(即類似於AWT事件處理工作)。這取決於圍繞這個部分的應用哪種方式更好。

import java.util.Random; 
import java.util.concurrent.*; 

public class InOrder 
{ 
    private static final Random rand = new Random(); 

    final static class Event implements Callable<Result> { 
    final int id; 

    public Event (int id) { 
     this.id = id; 
    } 
    public Result call() throws InterruptedException { 
     // arbitrary long computation 
     Thread.sleep(rand.nextInt(10000)); 
     return new Result(id); 
    } 
    } 
    final static class Result { 
    int id; 

    public Result(int id) { 
     this.id = id; 
    } 
    } 
    static final int STOP_ID = -1; 
    private static final ExecutorService POOL = Executors.newFixedThreadPool(10); 
    private static final BlockingQueue<Future<Result>> QUEUE = new ArrayBlockingQueue<>(10); 

    static void processResults() throws InterruptedException, ExecutionException { 
    for(;;) { 
     Result r=QUEUE.take().get(); 
     if(r.id==STOP_ID) return; 
     System.out.println("received result id="+r.id); 
    } 
    } 
    public static void main(String[] args) 
    { 
    POOL.submit(new Callable<Object>() { 
     public Object call() throws Exception { 
     processResults(); 
     return null; 
     } 
    }); 

    for(int id=0; id<100; id++) try { 
     QUEUE.put(POOL.submit(new Event(id))); 
    } catch(InterruptedException ex) { break; } 

    try { QUEUE.put(new EndMarker()); } 
    catch(InterruptedException ex) {} 

    POOL.shutdown(); 
    } 
    static final class EndMarker implements Future<Result> { 
    public boolean cancel(boolean mayInterruptIfRunning) { 
     return false; 
    } 
    public boolean isCancelled() { 
     return false; 
    } 
    public boolean isDone() { 
     return true; 
    } 
    public Result get() { 
     return new Result(STOP_ID); 
    } 
    public Result get(long timeout, TimeUnit unit) { 
     return get(); 
    } 
    } 
}