你可以做的是讀你的RDD到RDD [行],然後更改成數據幀。我們唯一的問題是我們也需要Schema。所以讓我們分兩步來做。
首先讓我們結合目標
val schema = spark.read.cassandraFormat("dogabase", "test").load.schema
/**
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(owner,StringType,true),
StructField(dog_id,IntegerType,true),
StructField(dog_age,IntegerType,true),
StructField(dog_name,StringType,true))
**/
獲取架構編程然後我們就可以讓org.apache.spark.sql.Row
對象我們卡桑德拉驅動 行。
import org.apache.spark.sql.Row
val joinResult =
sc.parallelize(Seq(Tuple1("Russ")))
.joinWithCassandraTable("test", "dogabase")
.map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row
現在,我們有一個架構和RDD [行]我們可以用火花會議
val dataset = spark.createDataFrame(joinResult, schema)
dataset.show
/**
+-----+------+-------+--------+
|owner|dog_id|dog_age|dog_name|
+-----+------+-------+--------+
| Russ| 1| 10| cara|
| Russ| 2| 11|sundance|
+-----+------+-------+--------+
**/
而只是櫃面你不相信我的createDataFrame方法,一個數據幀是數據集
dataset.getClass
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset
編輯:可能需要的轉換器
一些卡桑德拉類型無效b Spark行的asis,所以你可能需要轉換它們。這可以通過編寫快速轉換功能來完成。不幸的是,SCC使用的內置轉換使內部表示成爲可能,因此我們無法使用這些轉換。
def convertToSpark(element:Any): Any = {
case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date
case other => other
}
使你行
cassandraRow.columnValues.map(convertToSpark):_*
這是夢幻般的,也解決了一堆我一直有其他問題時,則...太感謝你了!我會明天實施這個,並會讓你知道我是如何得到的:) –
抱歉再次打擾你 - 這似乎是非常接近工作,除了我的卡桑德拉領域之一是日期,我看到的例外'編碼時出錯:java.lang.RuntimeException:org.joda.time.LocalDate不是日期模式的有效外部類型。 Do'u知道這是否有明顯的修復我失蹤?再次感謝 –
哦星火:)問題是cassandra驅動返回的類型「joda的localdate」與Spark不兼容。所以你需要做的就是將這些LocalDate轉換爲spark兼容類型。我建議您使用內置轉換器的連接器,但這些連接器的目標是內部表示,並且也不允許用於外部源。我將在上面的答案中提供一個轉換類型的代碼示例。 – RussS