2015-04-01 204 views
1

我想使用聚合器創建兩條消息中的消息,但我不知道如何執行此操作。Spring集成聚合器

目前我正在閱讀一個目錄中的兩個文件,並希望將這些消息聚合爲一個。

我的整個項目看起來是這樣的:閱讀的.zip

- >傳遞給解壓到一個目錄自定義消息處理程序 - >從這個目錄中讀取文件 - >嘗試它們聚集

它如果我可以在解壓縮文件後發送帶有兩個有效負載的消息,但在閱讀完畢後進行聚合就足夠了。

我unzipper看起來是這樣的:

public class ZipHandler extends AbstractMessageHandler { 

File dat; 
File json; 

@Override 
protected void handleMessageInternal(Message<?> message) throws Exception { 
    byte[] buffer = new byte[1024]; 
    try { 
     File file = (File) message.getPayload(); 
     ZipFile zip = new ZipFile(file); 

     for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries 
       .hasMoreElements();) { 
      ZipEntry ze = entries.nextElement(); 
      String name = ze.getName(); 

      if (name.endsWith(".dat") || name.endsWith(".DAT")) { 
       InputStream input = zip.getInputStream(ze); 
       File datFile = new File("D:/lrtrans/zipOut" 
         + File.separator + name); 
       FileOutputStream fos = new FileOutputStream(datFile); 
       int len; 
       while ((len = input.read(buffer)) > 0) { 
        fos.write(buffer, 0, len); 
       } 
       this.dat = datFile; 
       fos.close(); 
      } else if (name.endsWith(".json") || name.endsWith(".JSON")) { 
       InputStream input = zip.getInputStream(ze); 
       File jsonFile = new File("D:/lrtrans/zipOut" 
         + File.separator + name); 
       FileOutputStream fos = new FileOutputStream(jsonFile); 
       int len; 
       while ((len = input.read(buffer)) > 0) { 
        fos.write(buffer, 0, len); 
       } 
       this.json = jsonFile; 
       fos.close(); 
      } 
     } 
     zip.close(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

} 
} 

它需要這些文件,並將它們放入兩個目錄,從我讀他們再次使用FileReadingMessageSource。 我也想解決這個問題,只使用基於註釋的記法,而不是xml。

編輯:

我只想用defaultAggregatingMessagegroupProcssor與correlationStrategy基於我的頭被稱爲「拉鍊」,並根據該消息,因爲在這種情況下,兩個文件應合併成一個releaseStrategy。

@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel") 
public DefaultAggregatingMessageGroupProcessor aggregate(){ 
    DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor(); 
    return aggregator; 
} 
@CorrelationStrategy 
public String correlateBy(@Header("zipFile") String zip){ 
    return zip; 
} 
@ReleaseStrategy 
public boolean isReadytoRelease(List<Message<?>> messages) { 
    return messages.size() == 2; 
} 

回答

0

我想說,你去正確的方式。由於您的zip文件中包含多個文件,因此將其解壓縮並將這些文件作爲一條消息收集併發送給後續進程是正確的要求。

所以,是的,<aggregator>是給你的。只需確定如何關聯和分組即可。

不知道如何解壓縮它們,但確實可以使用zip文件名作爲correlationKey,並使用大量文件來確定組釋放組的信號。

隨意問更多的問題。但首先我需要看到你的「解壓器」。

UPDATE

首先,基於註解聚合配置比較有限的,最好是用@ServiceActivatorAggregatingMessageHandler@Bean有超過其選項的更多控制。

但是,即使是您的選擇,您也可以達到您的要求。但是@Aggregator配置應該用POJO的方法調用原理如下:

@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel") 
public List<File> aggregate(List<File> files){ 
    return files; 
} 

類似的東西。

+0

我發現如何爲我的關聯策略創建標題,但我不確定如何使用標準的DefaultAggregatingMessageGroupProcessor。我想只是返回它並使用@Aggregator Annotation定義輸入和輸出通道,但是這似乎只是創建一個新的消息,並使用DefaultAggregatingMessageGroupProcessor作爲有效內容並丟棄舊的兩個文件。我將編輯此代碼到問題 – Blue 2015-04-07 09:04:17

+0

請參閱我的答案中的更新 – 2015-04-07 09:51:07