2017-07-03 64 views
0

生產者 不同的線程擁有自己的SynchronizedBlockingQueue。每個生產者都將消息放入自己的隊列中。如何在沒有經紀人的情況下將消息傳遞給消費者

消費者

不同的主題將從無論是從隊列中的任何一個獲得消息,並啓動過程。

現在爲了溝通生產者和消費者,我們需要經紀人。這可能是瓶頸。消費者是否有其他方式從任何生產者那裏獲得一條消息並開始流程。

+0

您需要公開每個生產者的隊列,並讓每個消費者輪詢最粗略情況下的每個生產者。我寧願使用代理,因爲架構會更優雅(也更易於使用),如果存在瓶頸,大多數代理(rabbitmq,activemq等)都支持分佈式架構。沒有這些,你必須自己去做。 – Adonis

回答

0

既然你沒有指定一種語言,我想我會提供一個使用Ada編程語言的通用示例。這個例子讓消費者只需打印來自生產者的消息,但它給出了你描述的生產者 - 消費者架構。

with Ada.Task_Identification; use Ada.Task_Identification; 

package Multiple_Producer is 
    type Producer_Message is private; 

    protected type Buffer is 
     entry Set_Message (Item : in Producer_Message); 
     entry Get_Message (Item : out Producer_Message); 
    private 
     Msg : Producer_Message; 
     Is_New : Boolean := False; 
    end Buffer; 
    type Buf_Alias is access all Buffer; 
    type Buf_Array is array (Positive range <>) of aliased Buffer; 
    type Buf_Access is access all Buf_Array; 

    task type Producer is 
     entry Set_Buffer (Item : Buf_Alias); 
     entry Stop; 
    end Producer; 


    task Consumer is 
     entry Set_Buffers (Item : Buf_Access); 
     entry Stop; 
    end Consumer; 

private 
    type Producer_Message is record 
     the_Task : Task_Id; 
     Value : Integer; 
    end record; 
end Multiple_Producer; 

with Ada.Text_IO; use Ada.Text_IO; 

package body Multiple_Producer is 

    -------------- 
    -- Producer -- 
    -------------- 

    task body Producer is 
     Message : Producer_Message := (Current_Task, 0); 
     The_Buf : Buf_Alias; 
    begin 
     accept Set_Buffer(Item : in Buf_Alias) do 
     The_Buf := Item; 
     end Set_Buffer; 

     loop 
     select 
      accept Stop; 
      exit; 
     else 
      delay 0.01; 
      The_Buf.Set_Message(Message); 
      Message.Value := Message.Value + 1; 
     end select; 
     end loop; 
    end Producer; 

    -------------- 
    -- Consumer -- 
    -------------- 

    task body Consumer is 
     Message : Producer_Message; 
     Buffers : Buf_Access; 
    begin 
     accept Set_Buffers(Item : Buf_Access) do 
     Buffers := Item; 
     end Set_Buffers; 

     loop 
     select 
      accept Stop; 
      exit; 
     else 
      -- Poll the buffers 
      for I in Buffers'Range loop 
       select 
        Buffers(I).Get_Message(Message); 
        Put_Line(Image(Message.The_Task) & ": " & 
          Integer'Image(Message.Value)); 
       or 
        delay 0.001; 
       end select; 
      end loop; 
     end select; 
     end loop; 
    end Consumer; 

    ------------ 
    -- Buffer -- 
    ------------ 

    protected body Buffer is 

     ----------------- 
     -- Set_Message -- 
     ----------------- 

     entry Set_Message (Item : in Producer_Message) when not Is_New is 
     begin 
     Msg := Item; 
     Is_New := True; 
     end Set_Message; 

     ----------------- 
     -- Get_Message -- 
     ----------------- 

     entry Get_Message (Item : out Producer_Message) when Is_New is 
     begin 
     Item := Msg; 
     Is_New := False; 
     end Get_Message; 

    end Buffer; 

end Multiple_Producer; 

with Multiple_Producer; use Multiple_Producer; 

procedure Main is 
    subtype Producer_Range is Positive range 1..5; 
    The_Producers : array(Producer_Range) of Producer; 
    The_Buffers : Buf_Access := new Buf_Array(Producer_Range); 
begin 
    for I in Producer_Range loop 
     The_Producers(I).Set_Buffer(The_Buffers(I)'Access); 
    end loop; 
    Consumer.Set_Buffers(The_Buffers); 
    delay 4.0; 
    for P of The_Producers loop 
     P.Stop; 
    end loop; 
    Consumer.Stop; 
end Main; 
+0

language - Java – user3805189

相關問題