2015-05-22 44 views
1

基本上在我的FlowDef中,我想用我將通過API讀取的其他元組與我的元組進行連接。所以我創建了另一個空管道,並使用函數從API中填充元組。然後我做一個CoGroup並將其用作尾管。在級聯中創建一個「懸掛」管道

Tap tap1 = new Hfs(); 
Pipe pipe1 = new Pipe("pipe1"); 

Pipe pipeFromApi = new Pipe("pipeFromApi"); 
Pipe joinPipe = new CoGroup(pipe1, pipeFromApi); 
Tap tap2 = new Hfs(); 

flowDef.addSource(pipe1, tap1).addTailSink(joinPipe, tap2); 

這會導致以下錯誤 - 「管道名稱未在接收器或源映射中找到:'pipeFromApi'」。

有關如何實現此目的的任何想法?

回答

0

您可以創建一個虛擬SinkTap,並將其設置爲pipeFromApi的「源」。

管道「pipeFromApi」沒有水龍頭 - 每個管道應該有一個例外。

0

級聯中的管道用於命名管道組件的分支,管道組件的分支用於計劃將源頭或接收器綁定爲Tap。 Tap是管道組件的數據源或數據接收器。從這些定義中可以清楚地看出,爲了傳輸Tap所提供的數據,您需要將其與Pipe綁定。在你的代碼中,你已經定義了pipe pipeFromApi,但是沒有爲此創建任何源代碼(Tap)。另外,在創建這個Tap後,您需要將您的Pipe pipeFromApi與這個新創建的tap分別綁定到flowDef中。然後只有流程能夠識別兩個輸入源,一個是你的pipe1,另一個是你的pipeFromApi。 因此,正確的代碼片段可能如下所示:

Tap tap1 = new Hfs("..inputSource"); 
Pipe pipe1 = new Pipe("pipe1"); 

Tap tap2 = new Hfs("..inputSourceFromApi"); 
Pipe pipeFromApi = new Pipe("pipeFromApi"); 

Tap tap3 = new Hfs("..outputSinkDestination"); 
Pipe joinPipe = new CoGroup(pipe1, pipeFromApi); 

flowDef.addSource(pipe1, tap1).addSource(pipeFromApi, tap2).addTailSink(joinPipe, tap3);