0
生產者 不同的線程擁有自己的SynchronizedBlockingQueue。每個生產者都將消息放入自己的隊列中。如何在沒有經紀人的情況下將消息傳遞給消費者
消費者
不同的主題將從無論是從隊列中的任何一個獲得消息,並啓動過程。
現在爲了溝通生產者和消費者,我們需要經紀人。這可能是瓶頸。消費者是否有其他方式從任何生產者那裏獲得一條消息並開始流程。
生產者 不同的線程擁有自己的SynchronizedBlockingQueue。每個生產者都將消息放入自己的隊列中。如何在沒有經紀人的情況下將消息傳遞給消費者
消費者
不同的主題將從無論是從隊列中的任何一個獲得消息,並啓動過程。
現在爲了溝通生產者和消費者,我們需要經紀人。這可能是瓶頸。消費者是否有其他方式從任何生產者那裏獲得一條消息並開始流程。
既然你沒有指定一種語言,我想我會提供一個使用Ada編程語言的通用示例。這個例子讓消費者只需打印來自生產者的消息,但它給出了你描述的生產者 - 消費者架構。
with Ada.Task_Identification; use Ada.Task_Identification;
package Multiple_Producer is
type Producer_Message is private;
protected type Buffer is
entry Set_Message (Item : in Producer_Message);
entry Get_Message (Item : out Producer_Message);
private
Msg : Producer_Message;
Is_New : Boolean := False;
end Buffer;
type Buf_Alias is access all Buffer;
type Buf_Array is array (Positive range <>) of aliased Buffer;
type Buf_Access is access all Buf_Array;
task type Producer is
entry Set_Buffer (Item : Buf_Alias);
entry Stop;
end Producer;
task Consumer is
entry Set_Buffers (Item : Buf_Access);
entry Stop;
end Consumer;
private
type Producer_Message is record
the_Task : Task_Id;
Value : Integer;
end record;
end Multiple_Producer;
with Ada.Text_IO; use Ada.Text_IO;
package body Multiple_Producer is
--------------
-- Producer --
--------------
task body Producer is
Message : Producer_Message := (Current_Task, 0);
The_Buf : Buf_Alias;
begin
accept Set_Buffer(Item : in Buf_Alias) do
The_Buf := Item;
end Set_Buffer;
loop
select
accept Stop;
exit;
else
delay 0.01;
The_Buf.Set_Message(Message);
Message.Value := Message.Value + 1;
end select;
end loop;
end Producer;
--------------
-- Consumer --
--------------
task body Consumer is
Message : Producer_Message;
Buffers : Buf_Access;
begin
accept Set_Buffers(Item : Buf_Access) do
Buffers := Item;
end Set_Buffers;
loop
select
accept Stop;
exit;
else
-- Poll the buffers
for I in Buffers'Range loop
select
Buffers(I).Get_Message(Message);
Put_Line(Image(Message.The_Task) & ": " &
Integer'Image(Message.Value));
or
delay 0.001;
end select;
end loop;
end select;
end loop;
end Consumer;
------------
-- Buffer --
------------
protected body Buffer is
-----------------
-- Set_Message --
-----------------
entry Set_Message (Item : in Producer_Message) when not Is_New is
begin
Msg := Item;
Is_New := True;
end Set_Message;
-----------------
-- Get_Message --
-----------------
entry Get_Message (Item : out Producer_Message) when Is_New is
begin
Item := Msg;
Is_New := False;
end Get_Message;
end Buffer;
end Multiple_Producer;
with Multiple_Producer; use Multiple_Producer;
procedure Main is
subtype Producer_Range is Positive range 1..5;
The_Producers : array(Producer_Range) of Producer;
The_Buffers : Buf_Access := new Buf_Array(Producer_Range);
begin
for I in Producer_Range loop
The_Producers(I).Set_Buffer(The_Buffers(I)'Access);
end loop;
Consumer.Set_Buffers(The_Buffers);
delay 4.0;
for P of The_Producers loop
P.Stop;
end loop;
Consumer.Stop;
end Main;
language - Java – user3805189
您需要公開每個生產者的隊列,並讓每個消費者輪詢最粗略情況下的每個生產者。我寧願使用代理,因爲架構會更優雅(也更易於使用),如果存在瓶頸,大多數代理(rabbitmq,activemq等)都支持分佈式架構。沒有這些,你必須自己去做。 – Adonis