2015-01-14 36 views
0

我有一個彈簧整合流程,它以通道inboundadapter開始,並拾取文件並將它們作爲消息傳遞給系統。 在幾個組件之後,根據發佈策略或30秒的組超時,郵件將在「聚合器」上進行聚合。 下游處理有另一堆組件直到最後一個。聚合器發佈兩次彈簧整合消息

我面臨的問題是這樣的, 當我根據關聯ID發送33個創建33個「組/桶」的文件時,在「聚合器」處聚合,一些文件或消息似乎被「釋放」兩次。我之所以得出這樣的結論是因爲我有一個頻道攔截器,第一次在成功完成下游處理後,第二次顯示幾條消息通過「發佈」通道(正好出現在聚合器之後)。此外,這種行爲導致我的應用程序找不到文件並拋出我看到的異常。這導致我得出結論:消息桶/組/ corrID以某種方式被「發佈」兩次。

我試圖調試很多方法,但本質上,我想知道corrID /桶在被釋放後如何成功地通過單個線程中的所有下游組件,可以再次「釋放」。

我的問題是,我該如何調試?我想知道是什麼讓這個消息/桶重新出現在聚合器中。 我的聚合器是如下,

<int:aggregator id="bufferedFiles" input-channel="inQueueForStage" 
     output-channel="released" expire-groups-upon-completion="true" 
     send-partial-result-on-expiry="true" release-strategy="releaseHandler" 
     release-strategy-method="canRelease" 
     group-timeout-expression="size() > 0 ? T(com.att.datalake.ifr.loader.utils.MessageUtils).getAggregatorTimeout(one, @sourceSnapshot) : -1"> 
     <int:poller fixed-delay="${files.pickup.delay:3000}" 
      max-messages-per-poll="${num.files.pickup.per.poll:10}" 
      task-executor="executor" /> 
    </int:aggregator> 

說明聚合的:尺寸()> 0適用於每個相關桶。由於文件名,我發送的33個文件中的每一個都會產生/生成/創建一個新的存儲桶,因此聚合器將有33個存儲桶/組/ corrIds,每個存儲桶只包含一個文件。 因此聚合器SPEL表達式只是說如果沒有發佈策略,那麼如果組確實至少有一些文件,則在30秒後釋放桶/組。

我的頻道站適配器如下:

<int-file:inbound-channel-adapter id="files" channel="dispatchFiles" directory="${source.dir}" scanner="directoryScanner"> <int:poller fixed-delay="${files.pickup.delay:3000}" max-messages-per-poll="${num.files.pickup.per.poll:10}" /> </int-file:inbound-channel-adapter>

日誌 這裏是消息完成流程的第一時間的日誌。調用完成時間建議到達最後一個組件「completionHandler」SA。 enter image description here message_appearing_twice_causing_error

說明日誌: 「心病」 是桶/走廊裏被釋放兩次。我得到最終例外的原因是因爲在第一次,該文件從該原始位置被移除並被處理。所以第二次發生這種錯誤的發佈時,沒有什麼可以在那裏處理的。 從中可以看出,首批/走廊裏/桶加工和周圍11時09分結束,而第二個是圍繞11時10

重要的一點我注意到,這個啓動畫面行爲只發生在我有一個全局的頻道攔截器,在這個攔截器中我做了很長的處理。當這個攔截器被註釋掉時,錯誤就會消失。

問題: 是否有可能在任何情況下聚合器雙重釋放batch/corrId?我怎樣才能讓聚合器發射任何日誌?

感謝

編輯晚上10:15

匯聚下面我的頻道有一個攔截如下,

public Message<?> preSend(Message<?> message, MessageChannel channel) { LOGGER.info("******** Releasing from aggregator(interceptor) , corrID:{} at time:{} ********",MessageUtils.getCorrelationId(message), new Date()); finalReporter.callback(channel.toString(), message); return message; }

從聚合到最終compeltionHandler SA,我有單線程處理 Aggregator - > releasedChannel - > s ome SA1 - > some channel - > ..... - > completionChannel-> completeSA

當我運行33個分區時,讓我們按照corrId =「alh」第一次釋放時,它看起來像下面, alh first time released 它顯示的是,線程5發佈它,它應該處理所有的下游組件。但它留下它的中途,並開始做其他的事情,並通過不同影響線程稍後重新拾起如下, alh second thread-8 這似乎/好像是問題,

解決方案更新: 我以下三件事情進行排序工作周圍,此刻,

  1. 出於某種原因,我的攔截器在做return super.preSend(message, channel)而不是簡單地return message。我改成了後者

  2. 我有一個全球的渠道攔截,我刪除全局和保持個別

  3. 如果通道攔截了返回前的任何問題,會導致一個新的版本?

儘管我仍然可以看到圖片中描述的上述場景,但我沒有得到雙重處理嘗試,因此它避免了錯誤。我仍然試圖從這個意義上理解。

我明白這太具體而且難以解釋;仍然感謝您的時間和意見...

+0

請分享,例外,'<入站通道適配器>',日誌也不錯。對於最後一個我的意思是'log4j.category.org.springframework.integration = DEBUG'。請擴展一下這樣的「聚合器」配置。當你說你發送了33個文件作爲組時,並不清楚,但同時你檢查'size()> ...' –

+0

注意,expire-groups-upon-completion =「true」意味着具有相同相關ID的新消息將形成新的組;錯誤意味着舊的相關ID的新消息將被丟棄。但是,是的,顯示問題的完整DEBUG日誌將會很有用。 –

+0

謝謝你這麼全面的解釋,但你的商業日誌不會說我什麼。需要了解消息如何通過渠道傳播。在這種情況下,我可以使用'correlationId'。難道你不認爲你爲同一個'correlationId'建立了不同的組? –

回答

0

但是,是的。我認爲@GaryRussell是正確的:因爲您使用expire-groups-upon-completion="true"某些部分羣體可能會被髮布group-timeout-expression,並且具有相同correlationId的新消息將形成一個新羣體,該羣體將在下一個group-timeout發佈。你的size() > 0也不好。這意味着它將在group-timeout之後發佈部分羣組。也許size() > 1?不過,該組不能爲size() == 0。因爲它是在第一條消息上創建的,所以如果存在gruop,它至少包含一條消息。是的,組可以是空的,但在這種情況下,聚合器應標記爲expire-groups-upon-completion="false"。在這種情況下,它被標記爲completed並且不允許新消息。

0

經過調試和各種盲目的情況後,我相信至少我有一個解決方法和可能的根本原因。我會盡量勾勒所有我修改的東西,

根本原因:

我的攔截器調用了一個共同的回調方法的通用類。此方法基於請求來自的通道名稱,將決定採取的適當操作。這些行動主要是收集數據,增加計數器並堅持向數據庫提供一些信息。 看來,他們中的一些人有錯誤,因此,線程正在死亡,並重新發布消息。我不完全確定,如果情況並非如此,請糾正我。 但是,在我修復這些錯誤之後,重新發布問題似乎已經消退或完全消失。 難以診斷的原因是因爲我看不到在回調方法調用期間拋出的錯誤;可能是我抓到他們或可能是他們迷路了。

我還發現問題只出現在聚合器之後的任何通道攔截器上。彙總者之前的攔截者沒有提出任何問題;可能是因爲他們更簡單...

要調試, 我刪除了攔截,直接從各種組件(SAS),去除全球攔截做出回調,並試圖增加個人攔截特定的渠道。

感謝您的幫助。