我是一個初學者與卡夫卡和火花。我想通過火花流實時處理關於特定主題從卡夫卡收到的數據。我無法使用由createStream函數返回的JavaPairReceiverInputDStream。kafka火花流java api問題
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"testwordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(1));
Map<String, Integer> topics_map = new HashMap<String, Integer>();
topics_map.put("Customtopic", 10);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(jssc, "localhost:2181", "kafkasparkconsumer",
topics_map);
下面的代碼給出了一個錯誤:在類型JavaPairDStream
JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
的方法,圖(功能,R>)是不適用的參數(新PairFunction(){})SparkStreamingKafka。 java/Kafka-Spark/src/com/sd/kafka line 43 Java問題
我使用的spark版本是1.2.0。我找不到處理kafka消息的java api示例。誰能告訴我我需要改變什麼?