我想要實現使用Spring集成的HTTP端點,這聽http請求,該請求數據作爲消息到信道和另一端點應該監聽該信道上的信息並處理其發送。Spring集成在隊列聽不輪詢
聽起來很簡單。但我想達到的是:
- 消息應按順序處理。
- 消息應該儘快處理(如果隊列已經爲空,則在http請求後沒有延遲)。
- http請求應在收到消息後立即作出響應,而不是在處理後才發回,因此發件人將只知道收到了處理消息。
- 我不想使用外部隊列,比如RabbitMQ。
所以我需要一個QueueChannel
爲此。但如果我理解正確,從隊列中接收消息的唯一方法是poller
。所以點2不會被滿足。收到消息後,輪詢者看到它之前會有小的延遲。
所以問題是:是否有任何簡單的方法來實現這一點,我看不到?
當然我可以自己實現它。例如創建SmartLifeCycle
組件,它監聽DirectChannel
,只是把消息轉換爲java.util.concurrent.BlockingQueue
,也啓動一個專門的線程將等待此隊列和消息發送到另一個DirectChannel
進行處理。所以不會有任何延遲,因爲一旦BlockingQueue
不爲空,線程將被解除阻止。
這一切聽起來像一個「模式」 - 基於專用線程的兩個直接通道之間的一些隊列。
也許有一個simplier方式,在Spring集成已經實現了,我只是不明白,因爲在這個領域的expirience由於缺少的嗎?
謝謝你,加里!我沒有意識到'receiveTimeout'實際上是一個時間,輪詢者的線程將等待一個支持'BlockingQueue'的新消息,直到它釋放Scheduler的線程。在此之後,它將根據輪詢者的「觸發器」(例如「延遲」)再次佔用線程。所以延遲'0'和'1s'的receiveTimeout'輪詢器幾乎會一直阻塞一些調度器的線程,並且儘快處理消息!非常感謝你! * P.S。單線程執行程序使用執行程序內部隊列的執行程序通道也是一個我沒有想到的可行解決方案! – djxak
請記住,如果發生服務器崩潰,這些選項中的任何一個都可能導致郵件丟失 - 如果這對您的應用程序是個問題,則需要使用某些持久存儲區(如rabbitmq)。 –
是的,我知道這一點。但即使使用RabbitMq,信息也可能丟失。調用者和入站http端點之間的網絡連接,收到消息之後但在將其發送到RabbitMq隊列之前的應用程序崩潰,調用者的崩潰 - 僅舉幾例。因此,不是使用外部隊列(無論如何不會保證「至少一次」),我最好安排定期從外部源獲取最後X分鐘內的所有消息,並且明智地執行它們。某種「補償性收集器」來處理可能丟失的消息。 – djxak