我們使用apache Camel進行路由和從文件中提取。駱駝 - 來自兩個來源的數據
我有一種情況,我需要從共享文件夾中的文件和數據庫中獲取數據。我只有在來自雙方的數據到達時才需要合併數據。如果任何一方沒有收到,那麼我的數據合併過程應該等到雙方都在場。
可能嗎?我怎樣才能做到這一點?任何示例代碼?
我們使用apache Camel進行路由和從文件中提取。駱駝 - 來自兩個來源的數據
我有一種情況,我需要從共享文件夾中的文件和數據庫中獲取數據。我只有在來自雙方的數據到達時才需要合併數據。如果任何一方沒有收到,那麼我的數據合併過程應該等到雙方都在場。
可能嗎?我怎樣才能做到這一點?任何示例代碼?
某些東西必須觸發該過程 - 無論是文件還是數據庫都要選擇一個。
然後,您可以使用enricher pattern來填充其他來源(當數據準備就緒時)。聚合策略用於組合數據。你通常在java中編寫聚合策略。
該鏈接提供瞭如何豐富和合並數據的示例。您可以瞭解如何處理Camel文檔中的數據庫和文件。
我將這個用於處理日誌的壓縮文件一起使用。我附上了一個例子,希望它能幫助你。
//Archived
from("direct:" + EnvironmentSetup.ARCHIVED)
.routeId(ROUTES.ARCHIVED.name())
.setHeader(HEADER_ZIP_AGG_ID, header(Exchange.FILE_NAME))
.setHeader(HEADER_AFTER_ZIP_DEST).constant(getArchiveUri())
.setHeader(HEADER_STATUS).constant(STATUS.SUCCESS)
.pipeline()
.to("direct:" + EnvironmentSetup.ARCHIVED_ZIP)
.end()
.pipeline()
.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".report"))
.setBody(header(ProcessManager.PROCESS_LOG).convertToString())
.to("direct:" + EnvironmentSetup.ARCHIVED_ZIP)
.end()
.end();
from(
"direct:" + EnvironmentSetup.DECRYPT_FAILED_ZIP,
"direct:"+EnvironmentSetup.PROCESS_FAILED_ZIP,
"direct:"+EnvironmentSetup.ARCHIVED_ZIP
)
.routeId("ZIP")
.aggregate(header(HEADER_ZIP_AGG_ID), new CopiedGroupedExchangeAggregationStrategy())
.completionSize(2)
.marshal(zipFileDataFormat)
.multicast()
.pipeline()
.setHeader(Exchange.FILE_NAME, simple(String.format(
"${in.header.%s}/${in.header.%s}", HEADER_EMAIL, Exchange.FILE_NAME))) //header(HEADER_EMAIL). header(Exchange.FILE_NAME))
//.dynamicRouter(header(HEADER_AFTER_ZIP_DEST))
.to("direct:dynamic")
.end()
.pipeline()
.marshal(encryption)
.setHeader(Exchange.FILE_NAME, simple(String.format(
"${in.header.%s}/${in.header.%s}.gpg", HEADER_EMAIL, Exchange.FILE_NAME)))
//.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".gpg"))
.to("direct:"+EnvironmentSetup.SEND_BACK)
.end()
.end() //end aggregate
.end();
CopiedGroupedExchangeAggregationStrategy.java
public class CopiedGroupedExchangeAggregationStrategy extends
AbstractListAggregationStrategy<Exchange> {
@Override
public boolean isStoreAsBodyOnCompletion() {
// keep the list as a property to be compatible with old behavior
return true;
}
@Override
public Exchange getValue(Exchange exchange) {
return exchange.copy();
}
}
使用聚合戰略,說文件已經到來,駱駝過程被踢,但數據庫中的行沒有。在這種情況下,我想等待並開始輪詢,直到數據庫行也到達。這可能嗎?對不起,如果我提出了上面的鏈接,但問題很愚蠢。我希望這兩件事情都發生在單一的駱駝環境中。 – nav 2014-11-22 22:11:06