2013-01-31 88 views
11

這是我的用例。一起使用Spring @Scheduled和@Async

傳統系統更新數據庫隊列表QUEUE。

我希望計劃定期作業 - 檢查隊列 的內容 - 如果有表中的行鎖定該行並做了一些工作 - 刪除隊列中排

如果以前的工作仍在運行,那麼將創建一個新線程來完成這項工作。我想配置最大併發線程數。

我使用Spring 3和我目前的解決辦法是以下(使用1毫秒的固定利率,以獲得線程基本上連續運行)

@Scheduled(fixedRate = 1) 
@Async 
public void doSchedule() throws InterruptedException { 
    log.debug("Start schedule"); 
    publishWorker.start(); 
    log.debug("End schedule"); 
} 

<task:executor id="workerExecutor" pool-size="4" /> 

這創造了4個線程直客做和線程正確分享隊列中的工作量。但是,當線程需要很長時間才能完成時,我似乎正在獲取內存泄漏。

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0        |    80 | 373,410,496 |  89.74% 
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940       |   48 | 373,410,136 |  89.74% 
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68 

所以

1:我應該使用@Async和@Scheduled在一起嗎?

2:如果不是那麼我怎麼才能用彈簧來達到我的要求?

3:僅當其他線程忙時才能創建新線程?

謝謝大家!

編輯:我覺得作業隊列漸漸無限長的......現在,使用

<task:executor id="workerExecutor" 
    pool-size="1-4" 
    queue-capacity="10" rejection-policy="DISCARD" /> 

將報告與結果

+4

這豈不是沒有'@ Async'正常工作?無論如何,用'@ Scheduled'註解的方法應該是異步執行的。 – ach

+0

如果你想讓「線程連續運行」,那麼你應該不會真的在首先使用@Scheduled。它的用途是「預定」的活動,而不是連續的活動...... – JoeG

+0

你可能會考慮製作publishWorker.start();方法異步。 –

回答

0
//using a fixedRate of 1 millisecond to get the threads to run basically continuously 
@Scheduled(fixedRate = 1) 

當您使用@Scheduled一個新的線程將被創建並將在1毫秒的指定fixedRate處調用方法doSchedule。當你運行你的應用程序時,你已經可以看到4個線程正在競爭QUEUE表,並且可能會出現死鎖。

調查線程轉儲是否存在死鎖。 http://helpx.adobe.com/cq/kb/TakeThreadDump.html

@Async註釋在這裏沒有任何用處。

更好的實現方法是通過實現runnable並將您的類傳遞給具有所需線程數的TaskExecutor來創建類作爲線程。

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

還要檢查你的設計似乎並沒有被正確地處理同步。如果之前的作業正在運行並且在該行上持有鎖,那麼您創建的下一個作業仍然會看到該行,並將等待獲取該行的鎖定。

2

您可以嘗試

  1. 運行調度與一秒的延遲,這將鎖定&獲取的未到目前爲止鎖定所有 隊列記錄。
  2. 對於每條記錄,調用一個Async方法,它會處理那條記錄,&將其刪除。
  3. 執行程序的拒絕策略應爲ABORT,以便調度程序可以解鎖尚未發出的QUEUE。這樣調度器可以在下次運行中再次處理這些QUEUE。

當然,您將不得不處理場景,調度程序已鎖定QUEUE,但處理程序沒有完成處理它,無論出於何種原因。

僞代碼:

public class QueueScheduler { 
    @AutoWired 
    private QueueHandler queueHandler; 

    @Scheduled(fixedDelay = 1000) 
    public void doSchedule() throws InterruptedException { 
     log.debug("Start schedule"); 
     List<Long> queueIds = lockAndFetchAllUnlockedQueues(); 
     for (long id : queueIds) 
      queueHandler.process(id); 
     log.debug("End schedule"); 
    } 
} 

public class QueueHandler { 

    @Async 
    public void process(long queueId) { 
     // process the QUEUE & delete it from DB 
    } 
} 
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10" 
    rejection-policy="ABORT"/>