2012-10-22 33 views
0

我有如下設計:的Java的TimerTask應該等待一個阻塞隊列

有延伸TimerTask任務,定爲每分鐘運行。 此任務將嘗試從中央隊列(作爲單個用戶)獲取項目並將其表示寫入文件。

此外,有多個生產者不時將物品放入中央隊列中。

我感興趣的是每次執行任務時間(run() method executed)將提取所有從隊列中的項目,如果有項目,如果沒有項目做任何事。

生產者如果已滿,應睡在隊列中。

我對這個問題的解決方案是:

創建ExtractTask延伸的TimerTask。 ExtractTask將包含一個BlockingQueue。 每個生產者將通過執行方法getQueue()來接收對隊列實例的引用。 生產者將執行BlockingQueue.put()方法。 消費者將在run()中執行BlockingQueue.poll()方法。

你能提出一個更好的設計嗎?我的設計是否包含任何有問題的情況?這種設計可能遇到的任何同步問題?

+0

你有多個消費者嗎? –

+0

@Akram Berkawy:只有一個消費者 – Michael

回答

2

我想:

  • 保持在您的設計任務分開排隊,
  • 注入隊列,而不是進行查找,
  • 使用SchedulerService,而不是一個TimerTask

除了你已經得到它。

如果你願意冒險依賴春天,你應該看看Spring Integration。你描述的所有組件都在那裏。您還可以使用許多其他框架來解決問題,如駱駝或阿卡;我的主要觀點是如果你沒有必要,不要自己維護這些代碼。

免責聲明:我有點biased about Spring Integration

+0

+1,關於排隊注射,我沒有關注你,你想詳細說明一下嗎?我也對Spring集成有所偏見,儘管我對Spring比較陌生,但我很熟悉如何初始化這些對象,但是對於定時器功能我不太熟悉,不過很好的例子。 – Michael

+1

@邁克爾買我的書,裏面滿是例子:p。更嚴重的是,你可以看看https://github.com/SpringSource/spring-integration-samples – iwein

+0

謝謝,現在它似乎是最好的答案和一個很好的,我會等待一段時間,以獲得更多答案,給機會一些更有趣的想法,然後我會接受最好的一個。 – Michael

1

設計看起來不錯。這裏沒有太多的細節,所以很難確定。我建議將所有依賴注入到計時器任務中。

此外,你可能在沒有太多自定義代碼的情況下在Apache Camel中實現這一點。見https://camel.apache.org/timer.html

+0

+1,您是否想詳細說明如何將所有依賴注入到計時器任務中? – Michael

0

你說,在執行定時器當消費者將提取所有項目。

你應該要小心,從隊列中提取的所有項目沒有阻止操作的操作,它是poll()阻塞的方法調用的重複,這意味着在提取物品時 生產商將能夠添加項目到隊列中。

+0

不輪詢是否同步?我建議投票,不要採取,因爲它沒有阻塞時,隊列是空的,並返回null。我建議放,因爲它是阻塞的,如果隊列滿了,生產者會睡覺。 – Michael

+0

根據隊列的實現,會有或多或少的鎖定。民意調查/放是通常的路要走,雖然你可以優化使用drainTo並提供... – iwein

+0

我想使用像drainTo這樣的方法,雖然根據規範,似乎這種方法是不安全的。 – Michael

1

既然你問到的設計,我建議幾件事情:

  • 我會親自去執行人服務在計時器任務。看看here。如果需求更改爲,使用執行程序可確保您可以在將來以單獨的線程執行任務。
  • 嘗試將您的隊列與任務對象分開。
  • 一般在代碼中使用DI以使其可測試。
  • 我會讓生產者在他們的構造函數中接收隊列。
+0

感謝您的建議,因爲我已經回答了,並且在此問了一些誰刪除了他的帖子,您如何在執行Executor服務的同時每分鐘運行一次任務,(我只有一個重複任務) – Michael

+0

計劃線程池執行程序。 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html希望這會有所幫助。 –

+0

+1,謝謝,這可能有幫助 – Michael

1

根據您的設計,我可以想到下面這樣的事情。 ConsumerTask可以使用泛型,但我很難弄清楚如何在Producer線程中執行相同的操作。生產者和消費者都對生產/消費的物品數量有限制。從TimerTask邏輯中取消timerTask本身的run()方法中的定時器對於它的停止至關重要。在這種情況下,只能使用毒藥丸方法來關閉。 如果使用Executors.newSingleThreadExecutor()或scheduledThreadPoolExecutor(),則可以使用shutdown()和shutdownNow()方法來停止生產者或使用者。雖然TimerTask是檢查ConcurrentQueue的工作的一個很好的例子,但它不會用於生產系統。

編輯 向生產者線程添加通用功能。構造函數現在使用一個模板類來實現將項目添加到隊列的方法。我定義了一個抽象類AddItem,它包含和addItem()方法,只要生產者想將項添加到隊列中,該方法就會被調用。

import java.util.Date; 
import java.util.Random; 
import java.util.Timer; 
import java.util.TimerTask; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicLong; 

public class ConsumerTask<T> extends TimerTask { 
    Timer timer; 
    ConcurrentLinkedQueue<T> itemQueue; 
    AtomicLong count = new AtomicLong(0); 
    final long limit; 

    public ConsumerTask(ConcurrentLinkedQueue<T> itemQ, long lim, int seconds) { 
     limit = lim; 
     timer = new Timer(); 
     timer.scheduleAtFixedRate(this, new Date(), seconds * 1000); 
     itemQueue = itemQ; 
    } 

    public void run() { 
     T item = itemQueue.peek(); 
     if (item != null) { 
      if (count.incrementAndGet() <= limit) { 
       System.out.println("Extracting Item : " + itemQueue.poll()); 
      } else { 
       System.out 
         .println("Consumed : " + (count.get() - 1) + " items"); 
       timer.cancel(); 
      } 

     } 
    } 

    public static void main(String args[]) throws InterruptedException { 
     ConcurrentLinkedQueue<Integer> itemQ = new ConcurrentLinkedQueue<Integer>(); 
     ConsumerTask<Integer> ct = new ConsumerTask<Integer>(itemQ, 10, 1); 

     new Thread(new Producer<Integer>(itemQ, new IntegerAddItem(itemQ), 20)) 
       .start(); 
     new Thread(ct).start(); 

    } 
} 

abstract class AddItem<T> { 
    ConcurrentLinkedQueue<T> itemQ; 
    T t; 

    public AddItem(ConcurrentLinkedQueue<T> itemQ) { 
     this.itemQ = itemQ; 
    } 

    abstract boolean addItem(); 

    public boolean addItem(T t) { 
     return itemQ.add(t); 
    } 
} 

class IntegerAddItem extends AddItem<Integer> { 
    public IntegerAddItem(ConcurrentLinkedQueue<Integer> itemQ) { 
     super(itemQ); 
    } 

    AtomicInteger item = new AtomicInteger(0); 

    @Override 
    boolean addItem() { 
     return addItem(item.incrementAndGet()); 
    } 

} 

class Producer<T> implements Runnable { 
    private final ConcurrentLinkedQueue<T> itemQueue; 
    AtomicInteger item = new AtomicInteger(0); 
    AtomicLong count = new AtomicLong(0); 
    AddItem<T> addMethod; 
    final long limit; 

    public Producer(ConcurrentLinkedQueue<T> itemQ, AddItem<T> addMethod, 
      long limit) { 
     itemQueue = itemQ; 
     this.limit = limit; 
     this.addMethod = addMethod; 
    } 

    public void run() { 
     while (count.getAndIncrement() < limit) { 
      addMethod.addItem(); 
      try { 
       Thread.sleep(new Random().nextInt(5000)); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       Thread.currentThread().interrupt(); 
      } 

     } 
    } 
} 
+0

謝謝,有趣的建議。 – Michael