2015-09-10 43 views
1

我正在開發一個多線程應用程序,其中幾個「處理器」(ThreadPools中的Runnables)將消息發送給對方。他們通信使用BlockingQueue接口:當處理器A與任務T1完成時,它推動它排隊Q1(例如,如果BlockingQueue<MyTask>T1MyTask類表示);之後,處理器BQ1拉任務,執行計算並將結果結果推送到Q2;等等。RabbitMQ到BlockingQueue綁定

我使用LinkedBlockingQueue,因爲我的應用程序是單片的,所有處理器都「活着」在同一個JVM中。但是,我希望我的應用程序變成模塊化的(Microservice Architecture),所以我決定使用RabbitMQ作爲消息代理。

問題是從隊列的Java實現遷移到RabbitMQ,只需對客戶端源代碼進行微小的更改。因此,我試圖找到RabbitMQ抽象和BlockingQueue接口之間的某種綁定。所以,當有人向amqp隊列發送消息時,它應該出現在java隊列中。反之亦然:當有人推送一個對象到java隊列時,它應該傳播到amqp的交換。

下面介紹了輪詢的示例實現(使用spring-amqp,來自amqp的隊列)。

<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) { 
    LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(); 

    MessageConverter messageConverter = listenerContainer.getMessageConverter(); 
    listenerContainer.setupMessageListener((MessageListener) message -> { 
     Object task = messageConverter.fromMessage(message); 
     queue.offer(elementType.cast(task)); 
    }); 

    return queue; 
} 

我無法找到一個實現使用的RabbitMQ的隊列現在BlockingQueue界面的框架。如果這種框架不存在,我的想法在某種程度上是架構錯誤的,還是隻有沒有人沒有實現過呢?

回答

1

我不確定你是否真的想按照你所描述的方式來做 - 入站郵件將被傳送到隊列並存在內存中,而不是在RabbitMQ中。

我認爲一個簡單的BlockingQueue實現,它使用一個RabbitTemplate下(使用,或receiveAndConvert())可能是起飛/輪詢業務更好地拉離兔子隊列中的消息 - 它會在RabbitMQ的假消息,直到需要,並簡單地RabbitTemplate.convertAndSend()提供/放置操作。

雖然很簡單,但它可能是框架的一個有用補充;考慮contributing

+1

除了Gary的回答,我建議看看Spring Integration Framework提供的'PollableAmqpChannel':http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-頻道 –

+0

加里,謝謝你的回答。其實,我的意思是這種實施,但我認爲它已經存在。我會考慮在我的項目中整合這個實現之後的貢獻(如果spring-integration不適用於我的情況)。 –

+0

@ArtemBilan謝謝你,我會嘗試春季整合。第一印象是相當積極的。 –