2016-06-15 63 views
0

我想基準Spark vs Flink,爲此目的我正在進行多項測試。不過Flink並不適用於Kafka,與此同時Spark完美無缺。Fink與Kafka Consumer不起作用

的代碼非常簡單:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

val properties = new Properties() 
properties.setProperty("bootstrap.servers", "localhost:9092") 
properties.setProperty("group.id", "myGroup") 
println("topic: "+args(0)) 
val stream = env.addSource(new FlinkKafkaConsumer09[String](args(0), new SimpleStringSchema(), properties)) 
stream.print 

env.execute() 

我用卡夫卡0.9.0.0與同一主題(消費[弗林克]和製片人[卡夫卡控制檯]),但是當我在我的罐子發送到集羣,什麼都不會發生:

Cluster Flink

什麼它可能會發生?

+0

您是否正在閱讀預先填寫的Kafka主題(對於Flink和Spark具有相同的輸入)或者同時將數據寫入Kafka並從中讀取數據? –

+0

我通過生產者發送數據的同時Flink已啓動 –

+0

您是否嘗試過FlinkKafkaConsumer082連接器和/或指定zookeeper.connect屬性,如下所示:[link](http://stackoverflow.com/questions/31446374 /罐任何人股-A-弗林克-卡夫卡示例功能於階)?儘管文檔說FlinkKafkaConsumer09連接器不需要zookeeper.connect屬性,但它可能是一個很好的實驗。如果是,flink作業是否繼續運行?你在哪裏尋找輸出? – jagat

回答

0

對於這種特殊情況(源鏈接到接收器),Web接口將永遠不會報告發送/接收的字節/記錄。請注意,這將在不久的將來發生變化。

請檢查job-/taskmanager日誌是否不包含任何輸出。

+0

感謝您的答案,但在控制檯或者沒有任何反應 –

+0

我認爲在這種情況下,'taskmanager * .out'文件將包含輸出結果,如果你也將結果寫到Flink和Spark的Kafka話題上,應該會更容易測試 – aljoscha

+0

我在日誌裏有這個: - 消費者會閱讀下列主題(有分區的號碼):wordCount3(1) 然後: - 從JobManager空 斷開 - 接收的作業卡夫卡流 - 無法提交作業卡夫卡流,因爲...。 e沒有連接到JobManager。 –

0

您的stream.print不會在flink上的控制檯上打印,它會寫入flink0.9/logs/recentlog。除此之外,您可以添加自己的記錄器來確認輸出。

相關問題