2017-06-16 69 views
0

我想使用2 RDD之間的連接方法並將其保存到cassandra但我的代碼不起作用。在開始時,我得到了一個巨大的Main方法,一切運行良好,但是當我使用函數和類時,這不起作用。我是新來斯卡拉和火花斯卡拉/ Spark可串行化錯誤 - 加入不起作用

代碼:

class Migration extends Serializable { 

    case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable 
    case class siteExternalId(site_external_id: Option[String]) extends Serializable 
    case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable 

    def SparkMigrationProfile(sc: SparkContext) = { 

    val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE) 
    .keyBy[userId] 
    .filter(x => x._2.site_external_id != None) 

    val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE) 
    .keyBy[userId] 

    //dont work 
    test.join(profileRDD) 
    .foreach(println) 

    // don't work 
    test.join(profileRDD) 
    .saveToCassandra(keyspace, table) 

    } 

在beginig我得到了著名:線程「main」 org.apache.spark.SparkException例外:在任務不能序列。 。 。 所以我擴展我的主類和案例類,但stil不工作。

回答

0

我認爲你應該將你的案例類從Migration類移到專用文件和/或對象。這應該可以解決你的問題。另外,Scala案例類默認是可序列化的。

+0

它的工作!我現在變得如此愚蠢。 。 。 你能向我解釋爲什麼? – user3394825

+0

hi @ user3394825,很難說,因爲我沒有在Cassandra中使用Spark。根據我的經驗,當使用其他類中定義的案例類時,我遇到了類似的問題。在你的情況下,爲'cassandraTable'函數創建隱式參數可能會有一些問題(https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/ com/datastax/spark/connector/SparkContextFunctions.scala)例如'rrf:RowReaderFactory [T], ev:ValidRDDType [T]',但我只是猜測。我知道當使用Spark SQL Encoder時,也有類似的例外。 –

+0

案例類在技術上可以訪問封裝的遷移實例的內部類。當它們被序列化時,附帶的遷移對象也會被序列化。即使它被標記爲可序列化,但其中可能有某個實例變量不在其中。通常罪魁禍首是一個SparkContext對象。 –