2009-02-22 17 views
4

我有一些數據庫表,並且只要應用程序正在運行,就需要一次處理其中的記錄。因此,它看起來像這樣:需要設計「無限」線程的幫助

  1. 獲取尚未處理或尚未被其他線程處理的記錄。
  2. 處理它(這是一個很長的過程,取決於互聯網連接,因此它可能超時/拋出錯誤)。
  3. 移至下一條記錄。當到達桌子從頭開始結束時。

我沒有與線程太多的經驗,所以我看到兩種可能的策略:

方法A.

1,創建新的ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5); 

2 。添加5個任務:

for (int i = 0; i < 5; i++) { 
    taskExecutor.execute(new MyTask()); 
} 

3.每個任務都是無限循環,即:從表中讀取一條記錄,對其進行處理,然後獲取另一條記錄。

這種方法的問題是如何告知其他線程哪些記錄正在處理。要做到這一點,我可以在表中使用「status」字段,或者只使用一些CopyOnWriteArraySet來保存當前的處理ID。

方法B.

1.創建相同的ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5); 

2.具有無限循環,其選擇需要被處理的記錄,並將它們傳遞給執行:

while (true) { 
    //get next record here 
    taskExecutor.execute(new MyTask(record)); 
    //monitor the queue and wait until some thread is done processing, 
    //so I can add another record 
} 

3.每個任務處理單個記錄。

這種方法的問題是我需要將任務添加到執行程序的隊列中比處理它們要慢,以免它們隨着時間的推移而堆積起來。這意味着我不僅需要監視當前正在運行哪些任務,而且還需要監視它們何時處理完畢,因此我可以將新記錄添加到隊列中。

我個人認爲第一種方法更好(更容易),但我覺得第二種方法更爲正確。你怎麼看?或者我應該做一些完全不同的事情?

此外,如果需要,我可以使用Spring或Quartz庫。

謝謝。

回答

5

我認爲CompletionService(和ExecutorCompletionService)可以幫助你。

您通過完成服務提交所有的任務,它可以讓你等到線程中的一個(任何線程)完成其任務。只要有空閒線程,這樣您就可以提交下一個任務。這將意味着你使用的方法B.

僞代碼:在等待

Create ThreadPoolExecutor and ExecutorCompletionService wrapping it 

while (true) { 
    int freeThreads = executor.getMaximumPoolSize() - executor.getActiveCount() 
    fetch 'freeThreads' tasks and submit to completion service (which 
             in turn sends it to executor) 

    wait until completion service reports finished task (with timeout) 
} 

超時可以幫助您避免的情況時有隊列中沒有任務,所以所有線程處於空閒狀態,而你正在等待直到其中一人完成 - 這絕不會發生。

您可以通過ThreadPoolExecutor方法檢查空閒線程數:getActiveCount(活動線程)和getMaximumPoolSize(最大可用配置線程數)。您將需要直接創建ThreadPoolExecutor,或者轉換從Executors.newFixedThreadPool()返回的對象,但我更願意直接創建...有關詳細信息,請參閱Executors.newFixedThreadPool()方法的源代碼。

1

我會用這種方法:

使用一個線程來分配工作。此線程會產生5個其他線程並進入睡眠狀態。當工作線程結束時,它會喚醒工作分配器線程,然後產生新的工作線程並進入睡眠狀態......

4

另一種方法是使用尺寸5的ArrayBlockingQueue一個生產者線程會在桌子上,開始填充它,並把記錄作爲消費者處理它們。五個消費者線程將分別記錄()一個記錄,處理它並返回另一個記錄。通過這種方式,生產者線程可以確保一次不會向兩個線程分配記錄,並且消費者線程可以在獨立的記錄上工作。 Java Concurrency in Practice可能會給你更多的選擇,這是一個很好的閱讀這類問題。

+1

執行人可以完全按照相同的模式提交的工作線程。您可以配置隊列,拒絕策略,線程數量,峯值線程數量等。 – 2009-02-23 05:45:13

+0

+1 ArrayBlockingQueue – 2009-12-16 23:25:39

1

我想有一個靜態的集合中MyTask

public class MyTask implements Runnable { 
    private static ArrayList<RecordID> processed = new ArrayList<RecordID>(); 
    private static ArrayList<RecordID> processing = new ArrayList<RecordID>(); 

    private RecordID working = null; 

    public void run() { 
    for(;;) { 
     synchronized(MyTask.class) { 
     Record r = getUnprocessedRecord(); // use processed and processing to do query 
     if (r == null) { // no more in table to process 
      if (processing.length == 0) { // nothing is processing 
      processed.clear(); // this should allow us to get some results on the next loop 
      } 
      Thread.sleep(SLEEP_INTERVAL); 
      continue; 
     } else { 
      working = r.getRecordID(); 
      processing.add(working); 
     } 
     } 
     try { 
     //do work 
     synchronized(MyTask.class) { 
      processed.add(working); 
     } 
     } catch(Whatever w){ 
     } finally { 
     synchronized(MyTask.class) { 
      processing.remove(working); 
     } 
     } 
    } 
    } 

}

0

我一個人的意見,同春去石英。它是完美的選擇。現在已經用於生產2年了。當一些人已經做得最好的時候,爲什麼要試着重新發明輪子。更不用說運行它提供的不同模式。我會建議至少嘗試一下。