這裏是一些有用的鏈接:https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/a4JND9jiNCY如何在表超過22個字段時使用savetocassandra?
如何在上面的谷歌討論中使用參考代碼? 我不能理解scala代碼,請用一些例子來解釋它嗎?
這裏是一些有用的鏈接:https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/a4JND9jiNCY如何在表超過22個字段時使用savetocassandra?
如何在上面的谷歌討論中使用參考代碼? 我不能理解scala代碼,請用一些例子來解釋它嗎?
通常,您可以創建一個元組,然後調用SaveToCassandra。類似這樣的:
val myRdd = sc.cassandraTable("mykeypace", "mytable")
val myTransformedRdd = myRdd.map {
(myRdd.getString("field1"), myRdd.GetString("field3"))
}
myTransformedRdd.saveToCassandra("mykeyspace", "someothertable", SomeColumns("field1", field3"))
myRdd的類型是RDD [CassandraRow]。 myTransformedRdd的類型是RDD [(string,string)]。
在引擎蓋下,Scala實際上使用了Tuple2 [string,string]。 Scala支持Tuple22。較新版本的Scala支持更多。
如果您有超過22個字段的結構,您可以構建其他類型的RDD。例如,你可以構造一個RDD [CassandraRow]。
如果我把上面的代碼,並把它改爲使用CassandraRow對象,而不是元組,它可能是這樣的:
val myRdd = sc.cassandraTable("mykeypace", "mytable")
//build an array of the column names which we need later to make a CassandraRow object
val allColumnNames = Array[String](
"field1",
"field2"
)
//loop through the column names and create ColumnName objects from them
//we will need this later when we call SomeColumns()
val columnRefs = for(item <- allColumnNames) yield {
new ColumnName(item)
}
val myTransformedRdd = myRdd.map {
//create an Indexed Sequence with all of the values
//we will need this to create the CassandraRow object
val allValues = IndexedSeq[AnyRef](myRdd.GetString("field1"), myRdd.GetString("field3"))
new CassandraRow(allColumnNames, allValues)
}
//the _* syntax tells Scala to take our columnRefs array and pass them into SomeColumns in the correct way
myTransformedRdd.saveToCassandra("mykeyspace", "someothertable", SomeColumns(columnRefs:_*)
這兩段代碼完成同樣的事情,但第二個版本讓你通過您的結構中的超過22個項目