2015-12-29 56 views

回答

1

通常,您可以創建一個元組,然後調用SaveToCassandra。類似這樣的:

val myRdd = sc.cassandraTable("mykeypace", "mytable") 
val myTransformedRdd = myRdd.map { 
    (myRdd.getString("field1"), myRdd.GetString("field3")) 
} 
myTransformedRdd.saveToCassandra("mykeyspace", "someothertable", SomeColumns("field1", field3")) 

myRdd的類型是RDD [CassandraRow]。 myTransformedRdd的類型是RDD [(string,string)]。

在引擎蓋下,Scala實際上使用了Tuple2 [string,string]。 Scala支持Tuple22。較新版本的Scala支持更多。

如果您有超過22個字段的結構,您可以構建其他類型的RDD。例如,你可以構造一個RDD [CassandraRow]。

如果我把上面的代碼,並把它改爲使用CassandraRow對象,而不是元組,它可能是這樣的:

val myRdd = sc.cassandraTable("mykeypace", "mytable") 

//build an array of the column names which we need later to make a CassandraRow object 
val allColumnNames = Array[String](
    "field1", 
    "field2" 
) 

//loop through the column names and create ColumnName objects from them 
//we will need this later when we call SomeColumns() 
val columnRefs = for(item <- allColumnNames) yield { 
    new ColumnName(item) 
} 

val myTransformedRdd = myRdd.map { 

    //create an Indexed Sequence with all of the values 
    //we will need this to create the CassandraRow object 
    val allValues = IndexedSeq[AnyRef](myRdd.GetString("field1"), myRdd.GetString("field3")) 


    new CassandraRow(allColumnNames, allValues) 
} 

//the _* syntax tells Scala to take our columnRefs array and pass them into SomeColumns in the correct way 
myTransformedRdd.saveToCassandra("mykeyspace", "someothertable", SomeColumns(columnRefs:_*) 

這兩段代碼完成同樣的事情,但第二個版本讓你通過您的結構中的超過22個項目

相關問題