2009-12-07 98 views
3

方法我有一個隊列,其中我可以排隊不同的線程,所以可保證兩件事情:最佳線程同步隊列

  1. 請求處理一個接一個。
  2. 請求是在到達的順序處理

第二點是很重要的。否則,一個簡單的關鍵部分就足夠了。 我有不同的要求組,只有在一個組內,這些要點必須得到滿足。來自不同組的請求可以併發運行。

它看起來像這樣:

FTaskQueue.Enqueu('MyGroup'); 
try 
    Do Something (running in context of some thread) 
finally 
    FTaskQueue.Dequeu('MyGroup'); 
end; 

編輯:我已刪除的實際執行,因爲它隱藏我想解決

我需要這個,因爲我有一個基於印web服務器的問題接受http請求。首先,我爲請求找到一個相應的會話。然後爲該會話執行請求(代碼)。我可以爲同一個會話獲取多個請求(讀取第一個仍在處理的數據時我可以獲得新的請求),並且它們必須按照正確的到達順序逐個執行。所以我尋求一種通用的同步隊列,可以在這種情況下使用,以便請求可以排隊。我無法控制線程,每個請求都可能在不同的線程中執行。

什麼是最好的(ususal)方法來解決這類問題?問題是Enqueue和Dequeue必須是原子操作才能保證正確的順序。我目前的實施有一個很大的瓶頸,但它的工作。

編輯:貝婁是原子的入隊/出隊操作

你爾德normaly做這樣的事情的問題:

procedure Enqueue; 
begin 
    EnterCriticalSection(FCritSec); 
    try 
    DoEnqueue; 
    finally 
    LeaveCriticalSection(FCritSec); 
    end; 

    BlockTheCurrentThread; // here the thread blocks itself 
end; 

procedure Dequeue; 
begin 
    EnterCriticalSection(FCritSec); 
    try 
    DoDequeue; 
    UnblockTheNextThread; // here the thread unblocks another thread 
    finally 
    LeaveCriticalSection(FCritSec); 
    end; 
end; 

現在這裏的問題,這是不是原子。如果你有一個線程已經在隊列中,另一個線程來了並且調用Enqueue,它可能會發生,第二個線程將會離開關鍵部分並嘗試阻塞自己。現在線程調度器將恢復第一個線程,它將嘗試解除阻塞下一個(第二個)線程。但第二個線程還沒有被阻止,所以沒有任何反應。現在第二個線程繼續並阻止自己,但這是不正確的,因爲它不會被解除阻塞。如果阻塞位於關鍵部分內部,那麼臨界部分永遠不會離開,並且我們有一個死鎖。

+0

如果您有多個線程暫停和繼續彼此以確保只有一個線程在任何給定時間執行 - 那麼您應該意識到您的整個設計是錯誤的。在這種情況下,請求不應該等於一個線程。 – mghie 2009-12-07 12:05:09

+0

我懷疑這種方法是不正確的。但讓我們說我有一個Indy服務器。我在Web服務器事件處理程序中獲取http請求。我通過散列表找到一個會話,然後執行請求=爲該會話執行一些代碼。現在,如果在同一個會話中我收到多個請求,他們必須逐一執行。並且每個都將處於不同的線程環境中。我無法控制線程。如果你知道更好的方法,那麼請把它寫成答案。 – Runner 2009-12-07 12:13:45

+0

我明白了,但這些約束根本不可見於您的問題。我認爲,如果您刪除自己的解決方案的細節,只說明問題並尋求解決方法,您會得到更好的答案。 – mghie 2009-12-07 12:20:02

回答

9

的另一種方法:

讓每一個請求線程有一個手動重置事件即最初未設置。隊列管理器是一個維護此類事件的線程安全列表的簡單對象。 Enqueue()Dequeue()方法都將請求線程的事件作爲參數。

type 
    TRequestManager = class(TObject) 
    strict private 
    fCritSect: TCriticalSection; 
    fEvents: TList<TEvent>; 
    public 
    constructor Create; 
    destructor Destroy; override; 

    procedure Enqueue(ARequestEvent: TEvent); 
    procedure Dequeue(ARequestEvent: TEvent); 
    end; 

{ TRequestManager } 

constructor TRequestManager.Create; 
begin 
    inherited Create; 
    fCritSect := TCriticalSection.Create; 
    fEvents := TList<TEvent>.Create; 
end; 

destructor TRequestManager.Destroy; 
begin 
    Assert((fEvents = nil) or (fEvents.Count = 0)); 
    FreeAndNil(fEvents); 
    FreeAndNil(fCritSect); 
    inherited; 
end; 

procedure TRequestManager.Dequeue(ARequestEvent: TEvent); 
begin 
    fCritSect.Enter; 
    try 
    Assert(fEvents.Count > 0); 
    Assert(fEvents[0] = ARequestEvent); 
    fEvents.Delete(0); 
    if fEvents.Count > 0 then 
     fEvents[0].SetEvent; 
    finally 
    fCritSect.Release; 
    end; 
end; 

procedure TRequestManager.Enqueue(ARequestEvent: TEvent); 
begin 
    fCritSect.Enter; 
    try 
    Assert(ARequestEvent <> nil); 
    if fEvents.Count = 0 then 
     ARequestEvent.SetEvent 
    else 
     ARequestEvent.ResetEvent; 
    fEvents.Add(ARequestEvent); 
    finally 
    fCritSect.Release; 
    end; 
end; 

每個請求線程調用隊列管理Enqueue(),之後等待自己的事件,成爲信號。然後,它處理請求並調用Dequeue()

{ TRequestThread } 

type 
    TRequestThread = class(TThread) 
    strict private 
    fEvent: TEvent; 
    fManager: TRequestManager; 
    protected 
    procedure Execute; override; 
    public 
    constructor Create(AManager: TRequestManager); 
    end; 

constructor TRequestThread.Create(AManager: TRequestManager); 
begin 
    Assert(AManager <> nil); 
    inherited Create(TRUE); 
    fEvent := TEvent.Create(nil, TRUE, FALSE, ''); 
    fManager := AManager; 
    Resume; 
end; 

procedure TRequestThread.Execute; 
begin 
    fManager.Enqueue(fEvent); 
    try 
    fEvent.WaitFor(INFINITE); 
    OutputDebugString('Processing request'); 
    Sleep(1000); 
    OutputDebugString('Request processed'); 
    finally 
    fManager.Dequeue(fEvent); 
    end; 
end; 

{ TForm1 } 

procedure TForm1.Button1Click(Sender: TObject); 
var 
    i: integer; 
begin 
    for i := 1 to 10 do 
    TRequestThread.Create(fRequestManager); 
end; 

隊列管理器鎖定在兩者和Enqueue()Dequeue()事件的列表。如果Enqueue()中的列表爲空,它將在參數中設置事件,否則它將重置事件。然後它將事件追加到列表中。因此,第一個線程可以繼續請求,所有其他線程都會阻塞。在Dequeue()中,事件從列表頂部移除,並且設置下一個事件(如果有的話)。

這樣最後一個請求線程將導致下一個請求線程解除阻塞,完全沒有掛起或恢復線程。這個解決方案也不需要任何額外的線程或窗口,每個請求線程的單個事件對象就是所需要的。

+0

這正是我最初使用的方法。唯一的區別是我使用暫停/恢復,而不是事件,這是劣勢(我的方法是)。 但看看你遇到的問題(我更新了問題)。這些操作必須是原子的。這就是爲什麼我引入了aditional線程,所以所有操作都是從另一個線程發出的,並且它們完全是原子的,沒有任何關鍵部分。但是隨後引入了一個重要的瓶頸。如果你能解決這個問題,那麼我會接受你的答案,因爲在你的答案中,其他一切都已經達到最優。 – Runner 2009-12-07 13:48:32

+0

現在有兩種選擇。要麼我沒有看到什麼東西,解決方案很簡單,或者我完全在錯誤的樹上吠叫:) – Runner 2009-12-07 13:52:17

+0

啊,你發佈了代碼。是的,我認爲這應該起作用。我知道這很簡單,我錯過了一些東西:)我會嘗試以這種方式實現它。如果它擁有水,我相信它會,我會接受你的答案。我不認爲這可以做得更好。 – Runner 2009-12-07 14:02:13

2

我會回答你考慮過的評論中的其他信息。

如果您有多個需要序列化的線程,那麼您可以使用Windows免費提供的序列化機制。讓每個隊列成爲一個擁有自己的窗口和標準消息循環的線程。使用SendMessage()而不是PostThreadMessage(),Windows將負責阻止發送線程,直到消息已處理完畢,並確保維護正確的執行順序。通過爲每個請求組使用具有自己窗口的線程,確保多個組仍被同時處理。

這是一個簡單的解決方案,只有當請求本身可以在不同的線程上下文中處理時,它纔會起作用,在許多情況下這應該不是問題。

+1

請注意,這與用於將方法調用序列化到STA COM對象的原理相同。 – mghie 2009-12-07 12:47:39

+1

的確,這可能是一個很好的解決方案,是的,引入一個正在偵聽消息的新線程並不是問題。唯一的缺點是(除了虛擬窗口),就是這樣,我需要另一個線程池。我已經有來自Indy服務器的完美線程。但它可能是唯一可行的解​​決方案。讓我們等待如果有人有另一種方法 – Runner 2009-12-07 12:50:22

+0

請注意,沒有新的線程,因爲你已經有'TTaskQueueThread'。你只需讓它有一個窗口並使用阻塞消息處理。 – mghie 2009-12-07 12:59:36

0

您是否嘗試過Delphi提供的TThreadList對象?

它是線程安全的,它爲你管理鎖。您在主線程內管理線程外部的列表。

當請求要求新任務時,將其添加到列表中。當一個線程完成時,通過OnTerminate事件,你可以調用列表中的下一個線程。

+0

是的,我知道TThreadList。問題不在於線程安全列表,我已經介紹過了。問題是如何正確同步FIFO隊列。從mghie接受的答案完美地解決了這個問題。我的問題是,我採用了掛起/恢復線程的錯誤方法,而不是僅僅使用事件。 – Runner 2009-12-08 22:59:11