2016-03-02 24 views
0

我試圖用變換加入JavaPairRDD到RDD在JavaPairDStream但我發現了以下錯誤錯誤加入一個JavaPairRDD到JavaPairDStream:不兼容類型的數據使用轉化

incompatible types: no instance(s) of type variable(s) W exist so that org.apache.spark.api.java.JavaPairRDD<java.lang.String,scala.Tuple2<LogType,W>> conforms to org.apache.spark.api.java.JavaRDD<U> 

這裏是我的代碼:

// Load full table from C* 
JavaPairRDD<String,CassTableType> table = javaFunctions(
    jssc 
).cassandraTable(
    "mykeyspace", "mytable", mapRowTo(CassTableType.class) 
).keyBy(
    t -> t.getLogId() 
); 

// Process stream 
JavaPairDStream<String,LogType> logs = flumeStream.flatMapToPair(
    flumeEvent -> { 
    List<Tuple2<String, LogType>> events = new LinkedList<>(); 
    LogType log = LogType.parse(flumeEvent); 
    events.add(new Tuple2<String, LogType>(log.getLogID(), log)); 
    return events; 
    } 
); 

// Join RDD's in logs DStream to table 
JavaPairDStream<String, Tuple2<LogType, CassTableType>> 
    joinedRDD = logs.transform(
    rdd -> rdd.join(table)) 
); 

這是否意味着rdd是JavaRDD<U>而不是JavaPairRDD<String,LogType>按照我正在轉換的JavaPairDStream<String,LogType>?我如何實現這種連接?

回答

0

使用transformToPair而不是transform來解決它。