2016-01-22 15 views
7

我正在寫一個自定義的DataFlow無界數據源,從卡夫卡0.8讀取。我想使用DirectPipelineRunner在本地運行它。然而,我發現了以下stackstrace:DirectPipelineRunner上使用自定義的DataFlow無界源碼

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700) 
     at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
     at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
     at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
     at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87) 
     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174) 

這有一定道理,因爲我還沒有註冊的評估在任何時候我的自定義源。

閱讀https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只有評估者有界來源已註冊。爲定製無限源定義和註冊評估程序的推薦方法是什麼?

回答

3

DirectPipelineRunner當前僅運行有界輸入。我們正在積極努力消除這一限制,並希望很快發佈。

在此期間,你可以把平凡任何UnboundedSourceBoundedSource,用於測試目的,通過使用withMaxNumRecords,如下面的例子:

UnboundedSource<String> unboundedSource = ...; // make a Kafka source 
PCollection<String> boundedKafkaCollection = 
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10)); 

詳情請參閱this issue on GitHub


另外,在貢獻卡夫卡連接器方面還有幾項努力。您可能想通過our GitHub repository與我們和其他貢獻者進行交流。

+0

基本上我們使用的是老版本的客戶端API,所以我們在連接器方面的努力目前對您來說有點無用:-)我們希望我們能夠更新我們的kafka版本並最終使用一個更標準的連接器很快。 – bfabry