2016-04-27 66 views
1
HashMap<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", "localhost:9092"); 

String topics = "test4"; 
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" "))); 


JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
    StringDecoder.class, kafkaParams, topicsSet) 
    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { 
     @Override 
     public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) { 
     rdd.saveAsTextFile("output"); 
     return rdd; 
     } 
    }).map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> kv) { 
     return kv._2(); 
     } 
    }); 
stream1.print(); 
jssc.start(); 
jssc.awaitTermination(); 

交叉檢查主題「test4」中是否存在有效數據。使用Spark Streaming後無輸出

enter image description here

我期待的是從卡夫卡集羣流串,在控制檯的console.No例外印刷,也沒有輸出。 我在這裏失蹤的任何東西?

+2

你是否在StreamingContext上調用了'.start()'和'.awaitTermination()'? https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java#L109 – ccheneson

+0

Yes.Had實際上錯過了它。現在調用它。但仍然沒有輸出數據。 –

回答

4

您是否試圖在之後生成主題中的數據?

默認情況下,直接流使用配置auto.offset.reset =最大,這意味着,當沒有初始偏移量時,它會自動重置爲最大偏移量,所以基本上,您將只能讀取輸入的新消息流應用程序啓動後的主題。

1

由於ccheneson說,這可能是因爲你缺少.start().awaitTermination()

或者它可能是因爲transformations in Spark are lazy,這意味着你需要添加一個動作,以獲得滿意的結果。例如

stream1.print(); 

或者,它可能是因爲map正在對執行人進行,所以輸出將在執行程序的日誌,而不是駕駛者的日誌。

+0

是的。你是對的。使用打印更新代碼並開始。但是我無法在文件中看到任何輸出。交叉驗證數據在主題中可用。 –

+0

我可以更清楚地說明你最後的陳述嗎?我在本地運行它,這樣的工作不會在驅動程序本身運行嗎? –