2016-09-18 68 views
1

接收我是很新的星火流和我實現小型運動如從卡夫卡發送XML數據,需要接收數據通過火花流。我嘗試了所有可能的方式..但每次我得到空值。獲取空值,而從卡夫卡星火流

有一個在卡夫卡側沒有問題的,唯一的問題是從火花側接收流數據。

這裏是我如何實現代碼:

package com.package; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

public class SparkStringConsumer { 

    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 
     Set<String> topics = Collections.singleton("mytopic"); 

     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
     String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 
     directKafkaStream.foreachRDD(rdd -> { 
     System.out.println("--- New RDD with " + rdd.partitions().size() 
      + " partitions and " + rdd.count() + " records"); 
     rdd.foreach(record -> System.out.println(record._2)); 
     }); 


     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

,我使用以下版本:

**動物園管理員3.4.6

斯卡拉2.11

星火2.0

卡夫卡0.8.2 **

回答

1

可以是這樣的:

directKafkaStream.foreachRDD(rdd ->{    
      rdd.foreachPartition(item ->{ 
       while (item.hasNext()) {  
        System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"+item.next()); 
} 
} 
}); 

itme.next()包含鍵值對。並且您可以使用 item.next()._ 2

0

您的火花流應用程序看起來沒問題。我測試了它,它正在打印卡夫卡消息。您也可以在「Message Received」打印語句下面嘗試驗證kafka消息。

directKafkaStream.foreachRDD(rdd -> { 
    System.out.println("Message Received "+rdd.values().take(5)); 
    System.out.println("--- New RDD with " + rdd.partitions().size() 
     + " partitions and " + rdd.count() + " records"); 
    rdd.foreach(record -> System.out.println(record._2)); 
    }); 

如果您使用的動物園管理員然後設置以及卡夫卡PARAM

kafkaParams.put("zookeeper.connect","localhost:2181"); 

繼import語句我不是在你的程序所以加入看到這裏。

import org.apache.spark.streaming.kafka.KafkaUtils; 
import kafka.serializer.StringDecoder; 

還請驗證您是否可以在主題「mytopic」使用命令行卡夫卡控制檯消費者使用消息。

+0

您可以通過使用012ab獲取值@abaghel,感謝您的快速響應。我試着按照你的說法,仍然得到空的消息...這裏是消息: 收到消息[] ---新的RDD與1分區和0記錄 – user6325753

+0

你試過kafka控制檯消費者?你能看到那裏的消息嗎? – abaghel

+0

是在卡夫卡控制檯消費者嘗試....我可以看到那裏的消息... – user6325753

相關問題