2011-08-04 73 views
1

我有一組線程,其中每個線程必須等待其所需的輸入,做一些計算,並最終將其輸出值發送到特定的線程。發送處理的消息到特定的線程

我打算讓全局映射包含線程的名稱和線程本身,以便讓每個線程按名稱獲取其「後繼」線程,然後向它們發送值。

首先,我看着生產者 - 消費者如使用阻塞隊列:

class Consumer implements Runnable { 
    private final BlockingQueue queue; 

    Consumer(BlockingQueue q) { 
     queue = q; 
    } 

    public void run() { 
     try { 
      while(true) { 
       System.out.println("Waiting for input"); 
       consume(queue.take()); 
      } 
     } catch (InterruptedException ex) { 
      ex.printStackTrace(); 
     } 
    } 

    void consume(Object x) { 
     System.out.println("Received: " + x); 
    } 
} 

class Setup { 
    public static void main(String...args) { 
     BlockingQueue q = new ArrayBlockingQueue<String>(10); 
     Producer p = new Producer(q); 
     Consumer c1 = new Consumer(q); 
     Consumer c2 = new Consumer(q); 
     new Thread(p).start(); 
     new Thread(c1).start(); 
     new Thread(c2).start(); 
    } 
} 

我想我可以爲每個線程阻塞隊列。消費者線程然後將通過queue.take()循環,直到它接收到所有期望的值。

後來,我發現這post,其中有一個類似的問題我的問。提出的解決方案似乎比阻塞隊列解決方案更容易:它基於僅僅調用我希望發送消息的線程上的方法。

我想問你一些建議(因爲我認爲這是一種常見的情況),哪種方法最好,或者如果有更好的方法來實現我想要的。

非常感謝您的幫助。

回答

2

消費者生產者很好。 (那個「答案」你指的是在引用,這樣的問題是忌諱的話題..想通過...)

您可以使用QueuePipe,甚至PipedInputStreamPipedOutputStream。還有Exchanger

這是一個來自Exchanger javadoc的示例。不要擔心嵌套類,它只是一個簡潔的風格 - 與主題無關。

這裏我們有一個'管道'類。它有2個線程(名稱中的R/L表示左,右)。管道流量是R-> L。

/* 
* mostly based on 
* http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Exchanger.html 
*/ 
package so_6936111; 

import java.util.concurrent.Exchanger; 

public class WorkflowDemo { 

    public static void main(String[] args) { 
     Pipeline pipeline = new Pipeline(); 
     pipeline.start(); 
    } 
    // ---------------------------------------------------------------- 
    // Pipeline 
    // ---------------------------------------------------------------- 

    public static class Pipeline { 

     /** exchanger for messages */ 
     Exchanger<Message> exchanger = new Exchanger<Message>(); 

     /* the two message instances that are passed back and forth */ 
     Message msg_1 = new Message(); 
     Message msg_2 = new Message(); 

     /** startups the pipeline */ 
     void start() { 
      new Thread(new WorkerR()).start(); 
      new Thread(new WorkerL()).start(); 
     } 


     /** Message objects are passed between workflow threads */ 
     public static class Message { 
      private Object content; 
      public Object getContent() { return content; } 
      public void setContent(Object c) { this.content = c; } 
     } 


     /** WorkerR is at the head of the pipeline */ 
     class WorkerR implements Runnable { 
      public void run() { 
       Message message = msg_1; 
       try { 
        while (true) { 
         Object data = doSomeWork(); 
         message.setContent(data); 
         message = exchanger.exchange(message); 
        } 
       } catch (InterruptedException ex) { ex.printStackTrace();} 
      } 
      /** 
      * let's pretend this is where you get your 
      * initial data and do some work 
      */ 
      private Object doSomeWork() { 
       return String.format("[email protected]:%d", System.nanoTime()); 
      } 
     } 

     /** WorkerL is at the tail of the pipeline */ 
     class WorkerL implements Runnable { 
      public void run() { 
       Message message = msg_2; 
       try { 
        while (true) { 
         message = exchanger.exchange(message); 
         Object data = doPostProcessing(message.getContent()); 
         System.out.format("%s\n", data); 
        } 
       } catch (InterruptedException ex) { ex.printStackTrace();} 
      } 

      /** 
      * Let's pretend this is where the 2nd step of the workflow. 
      */ 
      private Object doPostProcessing(Object data) { 
       return String.format("%s | [email protected]:%d", data, System.nanoTime()); 
      } 
     } 
    } 
} 

輸出:

[email protected]:1312434325594730000 | [email protected]:1312434325594747000 
[email protected]:1312434325594750000 | [email protected]:1312434325594765000 
[email protected]:1312434325594768000 | [email protected]:1312434325594784000 
[email protected]:1312434325594787000 | [email protected]:1312434325594804000 
[email protected]:1312434325594806000 | [email protected]:1312434325594823000 
[email protected]:1312434325594826000 | [email protected]:1312434325594841000 
... 
+0

謝謝您的回答。交換器看起來像我需要的,而你說得對,我指出的另一個解決方案看起來不太好。我有一個最後的問題:如果我有幾個線程,並且讓我們說,生產者想發送消息到一個特定的線程,我該怎麼辦?每個線程應該有一個Exchanger實例?我很抱歉,如果這是一個愚蠢的問題,但我不想重新發明輪子。謝謝! –

+0

Rafael:只需更新您的問題,並提供您想要的細節。 (這個消費者/生產者的代碼只是噪音,fyi)。交換器是1:1,阻塞,無緩衝*交付*,只是爲了讓你瞭解爲什麼你需要在你的Q中確定你想要什麼,數據應該如何流動等等。上面只是一個*玩具*向您展示1:1 *管線*。 1:N或N:1?使用java.util.concurrent隊列。 – alphazero

相關問題