2017-08-31 103 views
0

我試圖整合火花和卡夫卡消費來自卡夫卡的消息。我有生產者代碼也發送關於「temp」主題的消息。另外,我正在使用Kafka的Console Producer來製作「temp」主題的消息。Kafka Spark Streaming Consumer將不會收到來自Kafka Console Producer的任何消息?

我已經創建了下面的代碼來使用來自同一個「temp」主題的消息,但它也不會收到單個消息。

計劃:

import java.util.Arrays; 
import java.util.Map; 
import java.util.HashMap; 
import static org.apache.commons.lang3.StringUtils.SPACE; 

import org.apache.spark.SparkConf; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import scala.Tuple2; 
import org.apache.log4j.Logger; 
import org.apache.spark.api.java.JavaSparkContext; 
import scala.collection.immutable.ListSet; 
import scala.collection.immutable.Set; 

public class ConsumerDemo { 

    public void main() { 
     String zkGroup = "localhost:2181"; 
     String group = "test"; 
     String[] topics = {"temp"}; 
     int numThreads = 1; 

     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
     Map<String, Integer> topicMap = new HashMap<>(); 
     for (String topic : topics) { 
      topicMap.put(topic, numThreads); 
     } 
     System.out.println("topics : " + Arrays.toString(topics)); 
     JavaPairReceiverInputDStream<String, String> messages 
       = KafkaUtils.createStream(jssc, zkGroup, group, topicMap); 

     messages.print(); 

     JavaDStream<String> lines = messages.map(Tuple2::_2); 

     //lines.print(); 
     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); 

     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) 
       .reduceByKey((i1, i2) -> i1 + i2); 

     //wordCounts.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

    public static void main(String[] args) { 
     System.out.println("Started..."); 
     new ConsumerDemo().main(); 
     System.out.println("Ended..."); 
    } 
} 

我加入之後在pom.xml文件的依賴性:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.9.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.11.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.2.0</version> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>0.9.0-incubating</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.3</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

    <dependency> 
     <groupId>org.anarres.lzo</groupId> 
     <artifactId>lzo-core</artifactId> 
     <version>1.0.5</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.8.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.module</groupId> 
     <artifactId>jackson-module-scala_2.10</artifactId> 
     <version>2.8.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.msiops.footing</groupId> 
     <artifactId>footing-tuple</artifactId> 
     <version>0.2</version> 
    </dependency> 

是我缺少一些依賴或問題是代碼?爲什麼此代碼不會收到任何消息?

+0

您是否可以用基於控制檯的消費者消費的消息?如果不是,那麼生產者可能會有問題。另外,請檢查您的端口號是否正確。我不認爲POM應該有任何問題,如果有的話,它不應該允許你建立/編譯項目。 –

+0

@ NileshPharate-是的,我可以使用卡夫卡的控制檯消費者使用消息,因此我們可以說這個問題與卡夫卡或zookeeper無關,即與我用於控制檯方法的相同的IP和端口。 – kit

回答

0

你沒有調用你有代碼來連接和使用來自Kafka的消息的方法。可以在public static void main()中寫入該邏輯,或者調用寫入該邏輯的方法。

0

當使用Kafka使用者時,特別是當我們在開發環境中進行測試和調試時,製作者可能會不斷地向Kafka發送消息。 在這種情況下,我們需要照顧這個卡夫卡消費者參數auto.offset.reset,它確定是否只讀取消費者開始運行後寫入主題的新消息?或者從主題

這裏開始讀的是在Kafka documentation給出的官方解釋:

auto.offset.reset
怎麼辦時,沒有初始卡夫卡或者偏移當前偏移不存在任何更多的服務器 上(例如,因爲該數據已經被刪除):

  1. 最早:自動重置偏移最早偏移
  2. 最新:自動復位偏移到最新偏移
  3. 沒有:拋出異常給消費者,如果沒有以前的偏移發現消費者的羣體
  4. 別的:拋出異常給消費者。

有關如何使用kafkaParams如下KafkaDStream創建示例代碼片段:

Map<String,String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("zookeeper.connect", "localhost:2181"); 
    kafkaParams.put("group.id", "test02"); //While you are testing the codein develeopment system, change this groupid each time you run the consumer 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("metadata.broker.list", "localhost:9092"); 
    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    Map<String, Integer> topics = new HashMap<String, Integer>(); 
    topics.put("temp", 1); 
    StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); 

    JavaPairDStream<String, String> messages = 
     KafkaUtils.createStream(jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topics, 
       storageLevel)  
     ; 
    messages.print(); 
+0

@ remisharoon-我只以毫秒爲單位獲取帶時間戳的消息。那是什麼意思?以下是示例輸出 - ------------------------------------------- 時間:1504785338000毫秒 ------------------------------------------- ------------------------------------------- Time:1504785340000 ms - ------------------------------------------ – kit

+0

@kit,意思是說一個「空DStream」。即。它沒有閱讀卡夫卡的任何記錄。在您開始SparkStreming作業 –

+0

@ remisharoon-之後,請嘗試寫信給Kafka主題。我正在從kafka的控制檯製作人發送消息給同一個kafka主題...仍然是在打印空的DStream ...這是什麼原因? – kit

相關問題