0
我試圖用變換加入JavaPairRDD到RDD在JavaPairDStream但我發現了以下錯誤錯誤加入一個JavaPairRDD到JavaPairDStream:不兼容類型的數據使用轉化
incompatible types: no instance(s) of type variable(s) W exist so that org.apache.spark.api.java.JavaPairRDD<java.lang.String,scala.Tuple2<LogType,W>> conforms to org.apache.spark.api.java.JavaRDD<U>
這裏是我的代碼:
// Load full table from C*
JavaPairRDD<String,CassTableType> table = javaFunctions(
jssc
).cassandraTable(
"mykeyspace", "mytable", mapRowTo(CassTableType.class)
).keyBy(
t -> t.getLogId()
);
// Process stream
JavaPairDStream<String,LogType> logs = flumeStream.flatMapToPair(
flumeEvent -> {
List<Tuple2<String, LogType>> events = new LinkedList<>();
LogType log = LogType.parse(flumeEvent);
events.add(new Tuple2<String, LogType>(log.getLogID(), log));
return events;
}
);
// Join RDD's in logs DStream to table
JavaPairDStream<String, Tuple2<LogType, CassTableType>>
joinedRDD = logs.transform(
rdd -> rdd.join(table))
);
這是否意味着rdd是JavaRDD<U>
而不是JavaPairRDD<String,LogType>
按照我正在轉換的JavaPairDStream<String,LogType>
?我如何實現這種連接?