我想使用聚合器創建兩條消息中的消息,但我不知道如何執行此操作。Spring集成聚合器
目前我正在閱讀一個目錄中的兩個文件,並希望將這些消息聚合爲一個。
我的整個項目看起來是這樣的:閱讀的.zip
- >傳遞給解壓到一個目錄自定義消息處理程序 - >從這個目錄中讀取文件 - >嘗試它們聚集
它如果我可以在解壓縮文件後發送帶有兩個有效負載的消息,但在閱讀完畢後進行聚合就足夠了。
我unzipper看起來是這樣的:
public class ZipHandler extends AbstractMessageHandler {
File dat;
File json;
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
byte[] buffer = new byte[1024];
try {
File file = (File) message.getPayload();
ZipFile zip = new ZipFile(file);
for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries
.hasMoreElements();) {
ZipEntry ze = entries.nextElement();
String name = ze.getName();
if (name.endsWith(".dat") || name.endsWith(".DAT")) {
InputStream input = zip.getInputStream(ze);
File datFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(datFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.dat = datFile;
fos.close();
} else if (name.endsWith(".json") || name.endsWith(".JSON")) {
InputStream input = zip.getInputStream(ze);
File jsonFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(jsonFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.json = jsonFile;
fos.close();
}
}
zip.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
它需要這些文件,並將它們放入兩個目錄,從我讀他們再次使用FileReadingMessageSource。 我也想解決這個問題,只使用基於註釋的記法,而不是xml。
編輯:
我只想用defaultAggregatingMessagegroupProcssor與correlationStrategy基於我的頭被稱爲「拉鍊」,並根據該消息,因爲在這種情況下,兩個文件應合併成一個releaseStrategy。
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public DefaultAggregatingMessageGroupProcessor aggregate(){
DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor();
return aggregator;
}
@CorrelationStrategy
public String correlateBy(@Header("zipFile") String zip){
return zip;
}
@ReleaseStrategy
public boolean isReadytoRelease(List<Message<?>> messages) {
return messages.size() == 2;
}
我發現如何爲我的關聯策略創建標題,但我不確定如何使用標準的DefaultAggregatingMessageGroupProcessor。我想只是返回它並使用@Aggregator Annotation定義輸入和輸出通道,但是這似乎只是創建一個新的消息,並使用DefaultAggregatingMessageGroupProcessor作爲有效內容並丟棄舊的兩個文件。我將編輯此代碼到問題 – Blue 2015-04-07 09:04:17
請參閱我的答案中的更新 – 2015-04-07 09:51:07