2

嗨,我是Spark Streaming的新手。我正在嘗試讀取xml文件並將其發送到kafka主題。這是我的Kafka代碼,它將數據發送到Kafka控制檯消費者。從kafka-Spark-Streaming讀取數據時獲得空集

代碼:

package org.apache.kafka.Kafka_Producer; 

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.Properties; 
import java.util.Properties; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutionException; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("unused") 
public class KafkaProducer { 
    private static String sCurrentLine; 
    public static void main(String args[]) throws InterruptedException, ExecutionException{ 
     try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt"))) 
     { 
      while ((sCurrentLine = br.readLine()) != null) { 
       System.out.println(sCurrentLine); 
       kafka(sCurrentLine); 
      } 
     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace();} 
    } 
    public static void kafka(String sCurrentLine) { 
     Properties props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class","kafka.producer.DefaultPartitioner"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> producer = new Producer<String, String>(config); 
     producer.send(new KeyedMessage<String, String>("sample",sCurrentLine)); 
     producer.close(); 
    } 
} 

我能接受的卡夫卡控制檯,消費者的數據。在下面的屏幕截圖中,您可以看到我發送給該主題的數據。

enter image description here

現在我需要流,我用火花流發送到卡夫卡的控制檯消費者的數據。這是代碼。

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

public class SparkStringConsumer { 

    public static void main(String[] args) { 

     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 
     Set<String> topics = Collections.singleton("sample"); 

     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
     String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 
     directKafkaStream.foreachRDD(rdd -> { 
     System.out.println("--- New RDD with " + rdd.partitions().size() 
      + " partitions and " + rdd.count() + " records"); 
     rdd.foreach(record -> System.out.println(record._2)); 
     }); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

獲取emptyset同時提交我的工作是這樣的:

./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar 

下面你可以看到數據是如何接收的截圖:

enter image description here

使用以下版本:

Spark - 2.0.0

動物園管理員-3.4.6

卡夫卡 - 0.8.2.1

任何建議,請

+0

SparkReceiver類的代碼在哪裏?您已將SparkStringConsumer類用於主題「mytopic」,並將KafkaProducer類發送給主題爲「sample」的消息。你能檢查嗎? – abaghel

+0

現在更新你能再次經歷一次嗎? –

+0

嘗試在kafka中產生新的信息 –

回答

1

最後衝浪互聯網後,我發現這些解決方案。

不要同時使用「Spark-Submit」和「SetMaster」。

  • 如果您從IDE運行代碼,使用SetMaster在你的代碼
  • 如果運行通過罐子「星火提交」不要把setMaster在你的代碼

還有一首先運行/提交你的火花罐,然後將數據發送到卡夫卡 - 控制檯 - 消費者

工作正常。