我正在處理大容量數據流〜每秒500個信息,數據在使用帶有10個併發使用者的SimpleMessageListenerContainer的Spring AMQP + Rabbit中消耗掉,我必須每15分鐘對Db進行一次檢查分鐘並重新加載某些屬性進行處理,這是通過一個石英觸發器完成的,該觸發器每15分鐘啓動一次,停止SimplelistenerContainer,完成必要的工作並再次啓動容器。Spring AMQP - 重複消息
當應用程序啓動時,當觸發器觸發並且容器重新啓動時,我看到相同的消息被多次傳遞,這會導致很多重複。消費者沒有任何放棄。
消息監聽器
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
在應用程序啓動時設置平行消費者開始消費
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
石英觸發
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}
嗨winash,我們將需要看到一些代碼和配置來幫助這個。相關的Spring配置將是一個很好的開始,以及Quartz作業代碼。 –
編輯原始問題添加代碼示例,這或多或少是如何構造代碼 – winash
經過一些調試發現該錯誤是與我的流初始化,我錯誤地責怪它的兔子,感謝您的幫助 – winash