我正在開發一個多線程應用程序,其中幾個「處理器」(ThreadPools中的Runnables)將消息發送給對方。他們通信使用BlockingQueue
接口:當處理器A
與任務T1
完成時,它推動它排隊Q1
(例如,如果BlockingQueue<MyTask>
T1
由MyTask
類表示);之後,處理器B
從Q1
拉任務,執行計算並將結果結果推送到Q2
;等等。RabbitMQ到BlockingQueue綁定
我使用LinkedBlockingQueue
,因爲我的應用程序是單片的,所有處理器都「活着」在同一個JVM中。但是,我希望我的應用程序變成模塊化的(Microservice Architecture),所以我決定使用RabbitMQ作爲消息代理。
問題是從隊列的Java實現遷移到RabbitMQ,只需對客戶端源代碼進行微小的更改。因此,我試圖找到RabbitMQ抽象和BlockingQueue
接口之間的某種綁定。所以,當有人向amqp隊列發送消息時,它應該出現在java隊列中。反之亦然:當有人推送一個對象到java隊列時,它應該傳播到amqp的交換。
下面介紹了輪詢的示例實現(使用spring-amqp,來自amqp的隊列)。
<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
MessageConverter messageConverter = listenerContainer.getMessageConverter();
listenerContainer.setupMessageListener((MessageListener) message -> {
Object task = messageConverter.fromMessage(message);
queue.offer(elementType.cast(task));
});
return queue;
}
我無法找到一個實現使用的RabbitMQ的隊列現在BlockingQueue
界面的框架。如果這種框架不存在,我的想法在某種程度上是架構錯誤的,還是隻有沒有人沒有實現過呢?
除了Gary的回答,我建議看看Spring Integration Framework提供的'PollableAmqpChannel':http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-頻道 –
加里,謝謝你的回答。其實,我的意思是這種實施,但我認爲它已經存在。我會考慮在我的項目中整合這個實現之後的貢獻(如果spring-integration不適用於我的情況)。 –
@ArtemBilan謝謝你,我會嘗試春季整合。第一印象是相當積極的。 –