我有一個彈簧整合流程,它以通道inboundadapter開始,並拾取文件並將它們作爲消息傳遞給系統。 在幾個組件之後,根據發佈策略或30秒的組超時,郵件將在「聚合器」上進行聚合。 下游處理有另一堆組件直到最後一個。聚合器發佈兩次彈簧整合消息
我面臨的問題是這樣的, 當我根據關聯ID發送33個創建33個「組/桶」的文件時,在「聚合器」處聚合,一些文件或消息似乎被「釋放」兩次。我之所以得出這樣的結論是因爲我有一個頻道攔截器,第一次在成功完成下游處理後,第二次顯示幾條消息通過「發佈」通道(正好出現在聚合器之後)。此外,這種行爲導致我的應用程序找不到文件並拋出我看到的異常。這導致我得出結論:消息桶/組/ corrID以某種方式被「發佈」兩次。
我試圖調試很多方法,但本質上,我想知道corrID /桶在被釋放後如何成功地通過單個線程中的所有下游組件,可以再次「釋放」。
我的問題是,我該如何調試?我想知道是什麼讓這個消息/桶重新出現在聚合器中。 我的聚合器是如下,
<int:aggregator id="bufferedFiles" input-channel="inQueueForStage"
output-channel="released" expire-groups-upon-completion="true"
send-partial-result-on-expiry="true" release-strategy="releaseHandler"
release-strategy-method="canRelease"
group-timeout-expression="size() > 0 ? T(com.att.datalake.ifr.loader.utils.MessageUtils).getAggregatorTimeout(one, @sourceSnapshot) : -1">
<int:poller fixed-delay="${files.pickup.delay:3000}"
max-messages-per-poll="${num.files.pickup.per.poll:10}"
task-executor="executor" />
</int:aggregator>
說明聚合的:尺寸()> 0適用於每個相關桶。由於文件名,我發送的33個文件中的每一個都會產生/生成/創建一個新的存儲桶,因此聚合器將有33個存儲桶/組/ corrIds,每個存儲桶只包含一個文件。 因此聚合器SPEL表達式只是說如果沒有發佈策略,那麼如果組確實至少有一些文件,則在30秒後釋放桶/組。
我的頻道站適配器如下:
<int-file:inbound-channel-adapter id="files" channel="dispatchFiles" directory="${source.dir}" scanner="directoryScanner"> <int:poller fixed-delay="${files.pickup.delay:3000}" max-messages-per-poll="${num.files.pickup.per.poll:10}" /> </int-file:inbound-channel-adapter>
日誌 這裏是消息完成流程的第一時間的日誌。調用完成時間建議到達最後一個組件「completionHandler」SA。
說明日誌: 「心病」 是桶/走廊裏被釋放兩次。我得到最終例外的原因是因爲在第一次,該文件從該原始位置被移除並被處理。所以第二次發生這種錯誤的發佈時,沒有什麼可以在那裏處理的。 從中可以看出,首批/走廊裏/桶加工和周圍11時09分結束,而第二個是圍繞11時10
的重要的一點我注意到,這個啓動畫面行爲只發生在我有一個全局的頻道攔截器,在這個攔截器中我做了很長的處理。當這個攔截器被註釋掉時,錯誤就會消失。
問題: 是否有可能在任何情況下聚合器雙重釋放batch/corrId?我怎樣才能讓聚合器發射任何日誌?
感謝
編輯晚上10:15
匯聚下面我的頻道有一個攔截如下,
public Message<?> preSend(Message<?> message, MessageChannel channel) { LOGGER.info("******** Releasing from aggregator(interceptor) , corrID:{} at time:{} ********",MessageUtils.getCorrelationId(message), new Date()); finalReporter.callback(channel.toString(), message); return message; }
從聚合到最終compeltionHandler SA,我有單線程處理 Aggregator - > releasedChannel - > s ome SA1 - > some channel - > ..... - > completionChannel-> completeSA
當我運行33個分區時,讓我們按照corrId =「alh」第一次釋放時,它看起來像下面, 它顯示的是,線程5發佈它,它應該處理所有的下游組件。但它留下它的中途,並開始做其他的事情,並通過不同影響線程稍後重新拾起如下, 這似乎/好像是問題,
解決方案更新: 我以下三件事情進行排序工作周圍,此刻,
出於某種原因,我的攔截器在做
return super.preSend(message, channel)
而不是簡單地return message
。我改成了後者我有一個全球的渠道攔截,我刪除全局和保持個別
如果通道攔截了返回前的任何問題,會導致一個新的版本?
儘管我仍然可以看到圖片中描述的上述場景,但我沒有得到雙重處理嘗試,因此它避免了錯誤。我仍然試圖從這個意義上理解。
我明白這太具體而且難以解釋;仍然感謝您的時間和意見...
請分享,例外,'<入站通道適配器>',日誌也不錯。對於最後一個我的意思是'log4j.category.org.springframework.integration = DEBUG'。請擴展一下這樣的「聚合器」配置。當你說你發送了33個文件作爲組時,並不清楚,但同時你檢查'size()> ...' –
注意,expire-groups-upon-completion =「true」意味着具有相同相關ID的新消息將形成新的組;錯誤意味着舊的相關ID的新消息將被丟棄。但是,是的,顯示問題的完整DEBUG日誌將會很有用。 –
謝謝你這麼全面的解釋,但你的商業日誌不會說我什麼。需要了解消息如何通過渠道傳播。在這種情況下,我可以使用'correlationId'。難道你不認爲你爲同一個'correlationId'建立了不同的組? –