2013-08-16 162 views
1

我有一個使用彈簧集成的代碼庫。SI +觀察者模式

<integration-ftp:inbound-channel-adapter id="ftpInbound" 
           channel="ftpChannel" 
           session-factory="ftpClientFactory" 
           filter="myCustomFilter"                        
           auto-create-local-directory="true" 
           delete-remote-files="false"                  
           remote-directory="/foo/bar"                  
           local-directory="file:output"> 
    <integration:poller fixed-rate="5000" max-messages-per-poll="-1"/> 
</integration-ftp:inbound-channel-adapter> 



<integration:channel id="ftpChannel"> 
     <integration:queue /> 
</integration:channel> 

<integration:service-activator id="mySA" method="handleMessage" input-channel="ftpChannel" output-channel="outputChannel" ref="myDataGetter"> 
    <integration:poller fixed-rate="1000" max-messages-per-poll="-1"/> 
</integration:service-activator> 

myCustomFilter的bean工作正常,我在myDataGetter bean的handleMessage()方法中獲得過濾文件。

到目前爲止這麼好。 現在在myDataGetter bean中,我會根據日期進一步過濾文件,例如poller會給我9個文件,但實際上他們只有3個版本的3個日期。

data_file1.20130816 
data_file1.20130815 
data_file1.20130814 
data_file2.20130816 
data_file2.20130815 
data_file2.20130814 
data_file3.20130816 
data_file3.20130815 
data_file3.20130814 

現在我的目標是獲得最新的3個文件,即20130816個版本data_file1,2和3 所以我得到了一個邏輯建立在的handleMessage方法來構造,其中將包括這3個文件的最新版本HashMap中。它使用迭代數據列表的天真邏輯完成,並將主題中的文件與它進行比較。經過幾次迭代後,我得到了包含最新3個文件的HashMap。

現在我的下一個要求是通過一個通道將這3個文件傳遞給下一個bean。

但是應該只讀取HashMap完全用最新的3個數據文件構建的應該讀取通道數據的bean。 在SI中是否有任何可以完成的操作,以便只有在處理完所有傳入數據並過濾出一組數據後才能將數據傳送到下一個通道?

我可以考慮讓myDataGetter成爲obeservable,並將下一個通道的bean作爲觀察者。但它不適合SI在事情的完成方式。

有何評論?

回答

1

您可以使用帶有自定義發行策略的<aggregator/>