我使用calliope即spark插件與cassandra連接。我創建了2個RDDS它看起來像與兩個RDD一起工作apache spark
class A val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1") val sc1 = new SparkContext("local", "name it any thing ") var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1) var rddResult1 = rdd1.persist(persistLevel)
class B val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2") var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2) var rddResult2 = rdd2.persist(persistLevel)
莫名其妙下面的代碼庫使用其他2不工作這將創建一個新的RDD。是否有可能我們不能一起迭代2個RDD?
這裏是一個不正常的代碼片段 -
case class Report(id: Long, anotherId: Long)
var reportRDD = rddResult2.flatMap(f => {
val buf = List[Report]()
**rddResult1.collect().toList**.foldLeft(buf)((k, v) => {
val buf1 = new ListBuffer[Report]
buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => {
buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
})
})
})
而如果我更換了大膽的事情並初始化VAL它喜歡 -
val collection = rddResult1.collect().toList
var reportRDD = rddResult2.flatMap(f => {
val buf = List[Report]()
**collection**.foldLeft(buf)((k, v) => {
val buf1 = new ListBuffer[Report]
buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => {
buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
})
})
})
它的工作原理,沒有任何交代?
你得到了什麼錯誤? – maasg
這是與該問題相關的很多代碼。清理它嗎?你會發現在Cassandra中沒有混合的相同行爲,以及'foldLeft'中的複雜性。否則它會提出一個很好的問題! –