2014-01-16 67 views
5

如果我遇到類似以下情況:併發觀察模式

ObserverA,ObserverB,ObserverC都從AbstractObserver繼承。

創建觀察員名單:

List<AbstractObserver> list = new ArrayList<AbstractObserver>(); 
list.add(new ObserverA()); 
list.add(new ObserverB()); 
list.add(new ObserverC()); 

而且某種以下方法處理程序運行在「主」線程:

public void eat(Food item) { 
    for(AbstractObserver o : list) { 
      o.eatFood(item); 
    } 
} 

public void drink(Coffee cup) { 
    for(AbstractObserver o : list) { 
      o.drinkCoffee(cup); 
    } 
} 

我將如何設計一個系統,我可以運行每一種食物和喝咖啡的方法在不同線程中的觀察者?具體來說,當「MAIN」線程收到一個事件(喝或吃的方法被調用)時,我將如何在ObserverA,ObserverB和ObserverC中的自己的線程中運行eatFood或drinkCoffee方法?

我想爲每個AbstractObserver子類實例設置不同的線程,因爲目前我正在按順序通知每個觀察者,這可能會導致延遲。

+0

我不明白這裏的觀察對象是如何起作用的。他們觀察的主題是什麼?它們與食品飲料和飲用咖啡方法有何關係? – Philipp

+0

不,不適合我。從我的角度來看,我不知道什麼時候吃喝(主線程方法)會被調用。他們可以被稱爲五分鐘或更多。我只是希望每個AbstractObserver子類實例都能夠在主線程的吃喝方法被調用時同時運行drinkCoffee或eatFood。 – sneeze1

+0

您對併發有任何限制嗎?例如,在之前的o.eatFood()結束之前,是否可以調用同一對象上的後續o.eatFood()? –

回答

2

使一些簡化的假設在這裏,你不關心收到通知時吃/喝完成後,你還可以使用執行框架拋出的工作到一個隊列:

// declare the work queue 
    private final Executor workQueue = Executors.newCachedThreadPool(); 




    // when you want to eat, schedule a bunch of 'eating' jobs 
     public void eat(final Food item){ 
      for (final AbstractObserver o: list) { 
      workQueue.execute(new Runnable() { 

       @Override 
       public void run() { 
        o.eatFood(item); // runs in background thread 
       } 
      }); 
      } 
     } 

在出口你的程序,你必須關閉執行程序:

workQueue.shutdown(); 
+0

此方法仍然要求您修改每種方法(吃,喝等),但至少您只有一個workQueue(不是20),並且執行程序框架爲您管理線程。如果您需要限制併發性,就像使用FixedThreadPool交換執行程序一樣簡單。 – JVMATL

+0

我認爲assylias的答案也是正確的,但這似乎是最簡單的解決方案。 – sneeze1

2

我不是專業人士,但也許你可以使用生產者 - 消費者設置。在這裏,作爲觀察實體的生產者可以在其自己的線程的隊列中添加一個通知,這裏的觀察者將從同一個隊列中獲取消息,但是會在其自己的線程中獲得通知。

2

爲了詳細說明氣墊船的回答,基本實現您的觀察者可能看起來像這樣:

class ObserverA implements Runnable { 
    private final BlockingQueue<Food> queue = new ArrayBlockingQueue<>(); 

    public void eatFood(Food f) { 
     queue.add(f); 
    } 

    public void run() { 
     try { 
      while (true) { 
       Food f = queue.take(); //blocks until some food is on the queue 
       //do what you have to do with that food 
      } 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
      //and exit 
     } 
    } 
} 

所以,你的代碼,它eatFood將從該方法立即返回並不會阻止你的主線程。

您顯然需要爲觀察者直接分配一個線程:new Thread(observerA).start();或通過ExecutorService,這可能更容易,更可取。


或者,您可以在「觀察」對象級別創建線程:

private static final ExecutorService fireObservers = Executors.newFixedThreadPool(10); 

public void eat(Food food) { 
    for (AbstractObserver o : observers) { 
     //(i) if an observer gets stuck, the others can still make progress 
     //(ii) if an observer throws an exception, a new thread will be created 
     Future<?> f = fireObservers.submit(() -> o.dataChanged(food)); 
     fireObservers.submit(new Callable<Void>() { 
      @Override public Void call() throws Exception { 
       try { 
        f.get(1, TimeUnit.SECONDS); 
       } catch (TimeoutException e) { 
        logger.warn("Slow observer {} has not processed food {} in one second", o, food); 
       } catch (ExecutionException e) { 
        logger.error("Observer " + o + " has thrown exception on food " + food, e.getCause()); 
       } 
       return null; 
      } 
     }); 
    } 
} 

(我主要是複製了here粘貼 - 你可能需要以使其適應您的需求)。

+0

謝謝,我想我明白這是如何工作的。我的問題是,如果我走這條路線,我將不得不爲主線程中的每個方法自己創建隊列。在你的例子中,我相信我必須爲drinkFood創建一個方法。在我的項目中,我有大約20種需要這種方法的方法,所以如果我嘗試自己處理20個隊列,然後處理run方法中的queue.take()方法,恐怕會變得很難處理。 – sneeze1

+0

@ sneeze1看我的編輯。 – assylias