2014-11-21 53 views
2

我們使用apache Camel進行路由和從文件中提取。駱駝 - 來自兩個來源的數據

我有一種情況,我需要從共享文件夾中的文件和數據庫中獲取數據。我只有在來自雙方的數據到達時才需要合併數據。如果任何一方沒有收到,那麼我的數據合併過程應該等到雙方都在場。

可能嗎?我怎樣才能做到這一點?任何示例代碼?

回答

0

某些東西必須觸發該過程 - 無論是文件還是數據庫都要選擇一個。

然後,您可以使用enricher pattern來填充其他來源(當數據準備就緒時)。聚合策略用於組合數據。你通常在java中編寫聚合策略。

該鏈接提供瞭如何豐富和合並數據的示例。您可以瞭解如何處理Camel文檔中的數據庫和文件。

+0

使用聚合戰略,說文件已經到來,駱駝過程被踢,但數據庫中的行沒有。在這種情況下,我想等待並開始輪詢,直到數據庫行也到達。這可能嗎?對不起,如果我提出了上面的鏈接,但問題很愚蠢。我希望這兩件事情都發生在單一的駱駝環境中。 – nav 2014-11-22 22:11:06

0

我將這個用於處理日誌的壓縮文件一起使用。我附上了一個例子,希望它能幫助你。

//Archived 
    from("direct:" + EnvironmentSetup.ARCHIVED) 
     .routeId(ROUTES.ARCHIVED.name()) 
     .setHeader(HEADER_ZIP_AGG_ID, header(Exchange.FILE_NAME)) 
     .setHeader(HEADER_AFTER_ZIP_DEST).constant(getArchiveUri()) 
     .setHeader(HEADER_STATUS).constant(STATUS.SUCCESS) 
     .pipeline() 
      .to("direct:" + EnvironmentSetup.ARCHIVED_ZIP) 
     .end() 
     .pipeline() 
      .setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".report")) 
      .setBody(header(ProcessManager.PROCESS_LOG).convertToString()) 
      .to("direct:" + EnvironmentSetup.ARCHIVED_ZIP) 
     .end() 
    .end(); 

    from(
     "direct:" + EnvironmentSetup.DECRYPT_FAILED_ZIP, 
     "direct:"+EnvironmentSetup.PROCESS_FAILED_ZIP, 
     "direct:"+EnvironmentSetup.ARCHIVED_ZIP 
    ) 
     .routeId("ZIP") 
      .aggregate(header(HEADER_ZIP_AGG_ID), new CopiedGroupedExchangeAggregationStrategy()) 
      .completionSize(2) 
      .marshal(zipFileDataFormat) 
       .multicast() 
       .pipeline() 
        .setHeader(Exchange.FILE_NAME, simple(String.format(
         "${in.header.%s}/${in.header.%s}", HEADER_EMAIL, Exchange.FILE_NAME))) //header(HEADER_EMAIL). header(Exchange.FILE_NAME)) 
        //.dynamicRouter(header(HEADER_AFTER_ZIP_DEST)) 
        .to("direct:dynamic") 

       .end() 
       .pipeline() 
        .marshal(encryption) 
        .setHeader(Exchange.FILE_NAME, simple(String.format(
         "${in.header.%s}/${in.header.%s}.gpg", HEADER_EMAIL, Exchange.FILE_NAME))) 
        //.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".gpg")) 
        .to("direct:"+EnvironmentSetup.SEND_BACK) 
       .end() 
      .end() //end aggregate 
     .end(); 

CopiedGroupedExchangeAggregationStrategy.java

public class CopiedGroupedExchangeAggregationStrategy extends 
                 AbstractListAggregationStrategy<Exchange> { 

    @Override 
    public boolean isStoreAsBodyOnCompletion() { 
     // keep the list as a property to be compatible with old behavior 
     return true; 
    } 

    @Override 
    public Exchange getValue(Exchange exchange) { 
     return exchange.copy(); 
    } 

}