2017-10-07 139 views
0

D中很容易使用std.container.dlist創建隊列類型。使用隊列在D中的線程之間進行通信

我想有多個線程,但讓他們與隊列通信,而不是消息傳遞(https://tour.dlang.org/tour/en/multithreading/message-passing)。據我所知,這些消息的設計始終是在代碼中的特定點處接收數據;接收線程將阻塞,直到收到預期的數據。 (編輯:我被告知有關receiveTimeout但沒有超時,只是一個檢查是真的在這種情況下更合適(也許超時0?)。我也不知道如果多個消息API將做什麼消息發送任何任何接收之前,我將不得不與該打。)

void main() { 
    spawn(&worker, thisTid); 

    // This line will block until the expected message is received. 
    receive (
     (string message) { 
      writeln("Received the message: ", text); 
     }, 
    ) 
} 

我所需要的僅僅接收數據,如果有一些。事情是這樣的:

void main() { 
    Queue!string queue// custom `Queue` type based on DList 

    spawn(&worker, queue); 

    while (true) { 
     // Go through any messages (while consuming `queue`) 
     for (string message; queue) { 
      writeln("Received a message: ", text); 
     } 
     // Do other stuff 
    } 
} 

我一直在使用shared變量(https://tour.dlang.org/tour/en/multithreading/synchronization-sharing)嘗試,但DMD抱怨說:「不許別名可變線程本地的數據。」或其他一些錯誤,具體情況。

這將如何在D中完成?或者,有沒有辦法使用消息來進行這種溝通?

回答

0

我已經得到了我需要的答案。

簡單地說,使用core.thread而不是std.concurrencystd.concurrency爲您管理郵件,並且不允許您自己管理郵件。 core.threadstd.concurrency在內部使用的。

較長的答案,這裏是我如何完全實現它。

我創建了Queue類型,它基於Singly Linked List,但保留最後一個元素的指針。根據Walter Brights願景(https://www.youtube.com/watch?v=cQkBOCo8UrE),Queue也使用標準組件inputRange和outputRange(或者至少我認爲它)。 Queue也被構建爲允許一個線程寫入,另一個線程在內部很少進行讀取操作,因此應該很快。
隊列我共享這裏https://pastebin.com/ddyPpLrp

一個簡單的實現爲具有第二螺紋讀取輸入:

Queue!string inputQueue = new Queue!string; 
ThreadInput threadInput = new ThreadInput(inputQueue); 
threadInput.start; 

while (true) { 
    foreach (string value; inputQueue) { 
     writeln(value); 
    } 
} 

ThreadInput被定義爲這樣的:

class ThreadInput : Thread { 
    private Queue!string queue; 

    this(Queue!string queue) { 
     super(&run); 
     this.queue = queue; 
    } 

    private void run() { 
     while (true) { 
      queue.put(readln); 
     } 
    } 
} 

代碼https://pastebin.com/w5jwRVrL
Queue再次https://pastebin.com/ddyPpLrp

1

這不回答具體問題,但TI不明朗起來什麼,我認爲是消息傳遞API的誤解......

就叫receiveTimeout而不是純receive

http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html

+0

不,這不是我真正需要的,但我確實想念'receiveTimeout'我不知道如何。如果我無法獲得其他任何工作,我可能會使用'receiveTimeout'來完成我所需要的工作。 –

+0

給receiveTimout一個負值,如-1會做你想做的。請參閱:https://stackoverflow.com/a/31624806/2026276 – Bauss

0

我使用這個:

shared class Queue(T) { 

    private T[] queue; 

    synchronized void opOpAssign(string op)(T object) if(op == "~") { 
     queue ~= object; 
    } 

    synchronized size_t length(){ 
     return queue.length; 
    } 

    synchronized T pop(){ 
     assert(queue.length, "Please check queue length, is 0"); 
     auto first = queue[0]; 
     queue = queue[1..$]; 
     return first; 
    } 

    synchronized shared(T[]) consume(){ 
     auto copy = queue; 
     queue = []; 
     return copy; 
    } 

}