我正在嘗試從kafka到火花流的基本示例。我很新火花並沒有什麼經驗。卡夫卡流火花不減少計數
我的程序如下(在Apache的火花的例子複製):
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
String zkQuorum = args[0];
String groupId = args[1];
String topicsToListen = args[2];
String numOfThread = args[3];
StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(numOfThread);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = topicsToListen.split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, groupId, topicMap);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
然後我開始我的卡夫卡經紀人和生產以下命令運行內置的jar:
$ SPARK_HOME/bin/spark-submit --class「JavaKafkaWordCount」--master local [2] PATH_TO_JAR/kafka-spark-streaming-1.0-SNAPSHOT -jar -with-dependencies.jar localhost:2181 test-consumer組測試1
當我產生從卡夫卡製片我期待已發佈多次以增加單詞的計數的一些話,但我看到的是1打印的每一個新單詞和計數發佈:
(你好,1)
我期待數增加時,我發佈了同一個詞不止一次,
(你好,2)
但這並沒有發生。我究竟在這裏理解了什麼錯誤,這與我傳遞給他的論點或者工作的目的有什麼關係?
有人可以提供一些見解嗎?
感謝 沙比爾