2013-01-21 64 views
1

我有大量的狀態機。偶爾,狀態機將需要從一個狀態移動到另一個狀態,這可能很便宜或昂貴,並且可能涉及DB讀取和寫入等。從很多隊列中消耗

這些狀態更改是由於來自客戶端的傳入命令而發生的,並且可能隨時發生。

我想平行工作量。我想要一個隊列說'把這臺機器從這個狀態移到這個狀態'。顯然,任何一臺機器的命令都需要按順序執行,但如果我有很多線程,我可以並行向前移動許多機器。

我可以爲每個狀態機器設置一個線程,但狀態機的數量是依賴於數據的,可能有數百或數千個;我不想每個狀態機都有一個專用的線程,我想要一個某種類型的池。

我怎樣纔能有一個工作者池,但確保嚴格按順序處理每個狀態機的命令?

UPDATE:所以想象Machine實例有優秀的命令列表。當線程池中的執行程序完成使用命令時,如果它具有更多未完成的命令,它會將Machine放回到線程池的任務隊列中。所以問題是,如何在追加第一個命令時自動將Machine加入到線程池中?並確保這是所有線程安全?

+0

看看[這篇文章](http://java.dzone.com/articles/ensuring-order-execution-tasks)。也許它有幫助。 –

回答

2

我建議你這樣的場景:

  1. 創建線程池,可能是一些固定的大小與Executors.newFixedThreadPool
  2. 創建一些(可能這將是一個HashMap),它適用於每個狀態機一個Semaphore。這信號燈將具有1的值,將公平信號燈保持序列
  3. 在Runnable接口,這將做的工作對乞討只需添加semaphore.aquire()其狀態機和semaphore.release()的信號在運行方法的末尾。

隨着線程池的大小,您將控制並行度的級別。

2

我建議另一種方法。使用線程池來移動狀態機中的狀態,而不是使用線程池來處理所有事情,包括執行工作的。在做了一些導致狀態改變的工作之後,應該將狀態改變事件添加到隊列中。處理狀態更改後,應該將另一個do-work事件添加到隊列中。

假設狀態轉換是工作驅動的,反之亦然,無法進行連續處理。

將信號存儲在特殊映射中的想法非常危險。地圖必須被同步(添加/刪除objs是線程不安全的),並且執行搜索(可能在地圖上同步)然後使用信號量的開銷相對較大。

此外 - 如果您想在您的應用程序中使用多線程架構,我認爲您應該一路走下去。混合不同的體系結構可能會在以後出現麻煩。

+0

非常有效的一點;對不起,我不清楚,這些狀態變化是外部觸發的,我想在內部排隊。 – Will

1

每臺機器都有一個線程ID。產生期望數量的線程。讓所有線程貪婪地處理來自全局隊列的消息。每個線程都會鎖定當前消息的服務器,以獨佔方式使用它(直到它完成處理當前消息和其隊列中的所有消息),並且其他線程將該服務器的消息放入其內部隊列中。

編輯:處理消息的僞代碼:

void handle(message) 
    targetMachine = message.targetMachine 
    if (targetMachine.thread != null) 
    targetMachine.thread.addToQueue(message); 
    else 
    targetMachine.thread = this; 
    process(message); 
    processAllQueueMessages(); 
    targetMachine.thread = null; 

處理消息的Java代碼:(我可能會稍微過於複雜的事情,但是這應該是線程安全的)

/* class ThreadClass */ 
void handle(Message message) 
{ 
    // get targetMachine from message 
    targetMachine.mutexInc.aquire(); // blocking 
    targetMachine.messages++; 
    boolean acquired = targetMachine.mutex.aquire(); // non-blocking 
    if (acquired) 
    targetMachine.threadID = this.ID; 
    targetMachine.mutexInc.release(); 
    if (!acquired) 
    // can put this before release, it may speed things up 
    threads[targetMachine.threadID].addToQueue(message); 
    else 
    { 
    process(message); 
    targetMachine.messages--; 
    while (true) 
    { 
     while (!queue.empty()) 
     { 
     process(queue.pop()); 
     targetMachine.messages--; 
     } 
     targetMachine.mutexInc.acquire(); // blocking 
     if (targetMachine.messages > 0) 
     { 
     targetMachine.mutexInc.release(); 
     Thread.sleep(1); 
     } 
     else 
     break; 
    } 
    targetMachine.mutex.release(); 
    targetMachine.mutexInc.release(); 
    } 
}