0
我想使用2 RDD之間的連接方法並將其保存到cassandra但我的代碼不起作用。在開始時,我得到了一個巨大的Main方法,一切運行良好,但是當我使用函數和類時,這不起作用。我是新來斯卡拉和火花斯卡拉/ Spark可串行化錯誤 - 加入不起作用
代碼:
class Migration extends Serializable {
case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable
case class siteExternalId(site_external_id: Option[String]) extends Serializable
case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable
def SparkMigrationProfile(sc: SparkContext) = {
val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE)
.keyBy[userId]
.filter(x => x._2.site_external_id != None)
val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE)
.keyBy[userId]
//dont work
test.join(profileRDD)
.foreach(println)
// don't work
test.join(profileRDD)
.saveToCassandra(keyspace, table)
}
在beginig我得到了著名:線程「main」 org.apache.spark.SparkException例外:在任務不能序列。 。 。 所以我擴展我的主類和案例類,但stil不工作。
它的工作!我現在變得如此愚蠢。 。 。 你能向我解釋爲什麼? – user3394825
hi @ user3394825,很難說,因爲我沒有在Cassandra中使用Spark。根據我的經驗,當使用其他類中定義的案例類時,我遇到了類似的問題。在你的情況下,爲'cassandraTable'函數創建隱式參數可能會有一些問題(https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/ com/datastax/spark/connector/SparkContextFunctions.scala)例如'rrf:RowReaderFactory [T], ev:ValidRDDType [T]',但我只是猜測。我知道當使用Spark SQL Encoder時,也有類似的例外。 –
案例類在技術上可以訪問封裝的遷移實例的內部類。當它們被序列化時,附帶的遷移對象也會被序列化。即使它被標記爲可序列化,但其中可能有某個實例變量不在其中。通常罪魁禍首是一個SparkContext對象。 –