2012-11-28 104 views
2

我正在處理大容量數據流〜每秒500個信息,數據在使用帶有10個併發使用者的SimpleMessageListenerContainer的Spring AMQP + Rabbit中消耗掉,我必須每15分鐘對Db進行一次檢查分鐘並重新加載某些屬性進行處理,這是通過一個石英觸發器完成的,該觸發器每15分鐘啓動一次,停止SimplelistenerContainer,完成必要的工作並再次啓動容器。Spring AMQP - 重複消息

當應用程序啓動時,當觸發器觸發並且容器重新啓動時,我看到相同的消息被多次傳遞,這會導致很多重複。消費者沒有任何放棄。

消息監聽器

class RoundRobinQueueListener implements MessageListener { 

@Override 
public void onMessage(Message message) { //do processing 
    } 

    } 

在應用程序啓動時設置平行消費者開始消費

final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); 
     RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance(); 
     messageListenerContainer.setQueueNames(queueName); 
     messageListenerContainer.setMessageListener(roundRobinListener); 
     messageListenerContainer.setConcurrentConsumers(10); 
     messageListenerContainer.setChannelTransacted(true); 

石英觸發

void execute(JobExecutionContext context) throws JobExecutionException { 
    messageListenerContainer.stop() 
    //Do db task, other processing 
    messageListenerContainer.start() 
    } 
+0

嗨winash,我們將需要看到一些代碼和配置來幫助這個。相關的Spring配置將是一個很好的開始,以及Quartz作業代碼。 –

+0

編輯原始問題添加代碼示例,這或多或少是如何構造代碼 – winash

+0

經過一些調試發現該錯誤是與我的流初始化,我錯誤地責怪它的兔子,感謝您的幫助 – winash

回答

0

看起來您的消息現在已被消費者確認。如果您未使用自動確認模式,則需要自行確認該消息(也可以在SimpleMessageListenerContainer配置此消息)。否則,代理會假定郵件未成功處理並嘗試再次發送郵件。

+0

我已經設置通道是事務性的,我也沒有明確設置任何確認模式(默認是auto)。 它確實看起來像一個確認問題,但隊列永遠不會建立,即消息確實得到處理。 – winash

+0

交易是否正確提交?在rmq中啓用調試日誌記錄,並檢查消息是否出列。 – mbelow

+0

現在就試試這個吧 – winash