1

由於從Cassandra中查詢數據是有限制的,我試圖用Spark批量讀取數據並將其存儲在RDD中。不能在Spark中聯合使用兩個CassandraJavaRDD <CassandraRow>

然後我添加所有的RDD,使用聯合函數。

這是我的代碼。

private void getDataFromCassandra(JavaSparkContext sc) { 


    CassandraJavaRDD<CassandraRow> cassandraRDD = null ; 
    CassandraJavaRDD<CassandraRow> cassandraRDD2 = null; 

    While(Some Condition) 

    cassandraRDD = CassandraJavaUtil 
       .javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz") 
       .where("pid IN ('" + sb + "')"); 

    if(cassandraRDD2==null){ 


    cassandraRDD2=cassandraRDD; 
    } 
    else{ 
     cassandraRDD2 = cassandraRDD2.union(cassandraRDD); 
    } 
}    

}

但在工會,我發現了以下錯誤。

類型不匹配:不能轉換從JavaRDD到CassandraJavaRDD

雖然無論是RDD的是相似類型的。

所以1)須本人申請一個演員的

cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD); 

2)或在RDD之一的類型更改爲JavaRDD

+0

你在哪裏設置'cassandraRDD2'?它似乎總是空的。 –

+0

在if條件中,我將cassandraRDD2分配給cassandraRDD。 –

+0

你如何執行'null.isEmpty()'?因爲這就是你在那裏做的 –

回答

2

因爲根據docs的問題發生:

方法:聯合(JavaRDD其他)返回此RDD的聯合和另一個聯合。

返回值:JavaRDD

,因此不匹配。

因爲根據this

public class CassandraJavaRDD<R> extends JavaRDD<R> { 
... 
} 

CassandraJavaRDD類擴展JavaRDD所以可以使用:

JavaRDD<CassandraRow> cassandraRDD = null; 
JavaRDD<CassandraRow> cassandraRDD2 = null; 

因此union()方法的返回值將匹配其類型。

+0

感謝您的答覆。 –

+0

JavaRDD cassandraRDD2 = sc.emptyRDD(); JavaRDD cassandraRDD = sc。emptyRDD();我可以將這兩個空RDD聯合爲cassandraRDD2 = cassandraRDD2.union(cassandraRDD); ? –

+0

你應該可以做到。 –

相關問題