2014-06-12 161 views
1

我使用calliope即spark插件與cassandra連接。我創建了2個RDDS它看起來像與兩個RDD一起工作apache spark

class A val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1") val sc1 = new SparkContext("local", "name it any thing ") var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1) var rddResult1 = rdd1.persist(persistLevel)

class B val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2") var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2) var rddResult2 = rdd2.persist(persistLevel)

莫名其妙下面的代碼庫使用其他2不工作這將創建一個新的RDD。是否有可能我們不能一起迭代2個RDD?

這裏是一個不正常的代碼片段 -

case class Report(id: Long, anotherId: Long) 

    var reportRDD = rddResult2.flatMap(f => { 
    val buf = List[Report]() 
    **rddResult1.collect().toList**.foldLeft(buf)((k, v) => { 
     val buf1 = new ListBuffer[Report] 
     buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => { 
     buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2) 
     }) 
    }) 
    }) 

而如果我更換了大膽的事情並初始化VAL它喜歡 -

val collection = rddResult1.collect().toList 

var reportRDD = rddResult2.flatMap(f => { 
    val buf = List[Report]() 
    **collection**.foldLeft(buf)((k, v) => { 
     val buf1 = new ListBuffer[Report] 
     buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => { 
     buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2) 
     }) 
    }) 
    }) 

它的工作原理,沒有任何交代?

+0

你得到了什麼錯誤? – maasg

+3

這是與該問題相關的很多代碼。清理它嗎?你會發現在Cassandra中沒有混合的相同行爲,以及'foldLeft'中的複雜性。否則它會提出一個很好的問題! –

回答

5

您正在將變換與動作混合使用。關閉rdd2.flatMap對工作人員執行,而rdd1.collect是Spark行話中的「操作」,並將數據傳回給驅動程序。所以,非正式地說,當你嘗試flatmap時,數據不在那裏。 (我不知道足夠的內部信息 - 是 - 查明確切的根本原因)

如果要分佈式操作兩個RDD,則應該使用其中一個聯接功能(聯接, leftOuterJoin,rightOuterJoin,cogroup)。

E.g.

val mappedRdd1 = rdd1.map(x=> (x.id,x)) 
val mappedRdd2 = rdd2.map(x=> (x.customerId, x)) 

val joined = mappedRdd1.join(mappedRdd2) 
joined.flatMap(...reporting logic..).collect 
+0

感謝您的幫助,但不知何故,我沒有獲得RDD的連接功能。然而,uning'新的PairRDDFunctions(rdd1).join(rdd2)'工作。 – tesnik03

+1

你應該'導入​​org.apache.spark.SparkContext._'並將你的原始rdd映射到一個pairRDD。加入是在關鍵字上完成的,這是PairRDD中元組的第一個元素。使用上面給出的例子:'val mappedRdd1 = rdd1.map(x =>(x.id,x))'在您的數據模型中使用一個自然PK就可以實現。 – maasg

+0

真棒..謝謝 – tesnik03

2

您可以在應用程序中對RDD進行操作。但是你不能在執行者(工作者節點)上操作RDD。執行者不能發出命令來驅動集羣。 flatMap中的代碼在執行程序上運行。

在第一種情況下,您嘗試對執行程序中的RDD進行操作。我估計你會得到一個NotSerializableException,因爲你甚至不能將RDD對象發送給執行者。在第二種情況下,您將RDD內容拖到應用程序中,然後將此簡單List發送給執行程序。 (Lambda捕獲會自動序列化。)