2017-09-27 88 views
1

我使用Datastax spark-cassandra-connector訪問Cassandra中的一些數據。Scala加入withCassandra表結果(或CassandraTableScanRDD)到數據集

爲了能夠有效地訪問我需要查詢的所有數據,我必須使用joinWithCassandraTable方法從一堆分區中取回數據。這給了我一個類com.datastax.spark.connector.rdd.CassandraTableScanRDD(或類似的,測試我實際上只是使用標準的sc.cassandraTable(ks, tbl)方法來讀取數據)的對象。

問題是,我需要在結果對象上使用的所有方法都需要類org.apache.spark.sql.Dataset的對象。

我已經做了很多搜索,並且一直沒能找到任何幫助 - 我發現的最接近的是this類似的問題,我不認爲它已經得到了充分的回答,因爲它忽略了使用情況下,訪問所有必要數據的推薦方法是使用joinWithCassandraTable

我也是新來的java和斯卡拉,所以對不起,如果我有點慢。任何幫助都會受到極大的讚賞,因爲我在這一點上很困難。

感謝, AKHIL

回答

2

你可以做的是讀你的RDD到RDD [行],然後更改成數據幀。我們唯一的問題是我們也需要Schema。所以讓我們分兩步來做。

首先讓我們結合目標

val schema = spark.read.cassandraFormat("dogabase", "test").load.schema 

/** 
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(owner,StringType,true), 
StructField(dog_id,IntegerType,true), 
StructField(dog_age,IntegerType,true), 
StructField(dog_name,StringType,true)) 
**/ 

獲取架構編程然後我們就可以讓org.apache.spark.sql.Row對象我們卡桑德拉驅動 行。

import org.apache.spark.sql.Row 
val joinResult = 
    sc.parallelize(Seq(Tuple1("Russ"))) 
    .joinWithCassandraTable("test", "dogabase") 
    .map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row 

現在,我們有一個架構和RDD [行]我們可以用火花會議

val dataset = spark.createDataFrame(joinResult, schema) 
dataset.show 

/** 
+-----+------+-------+--------+ 
|owner|dog_id|dog_age|dog_name| 
+-----+------+-------+--------+ 
| Russ|  1|  10| cara| 
| Russ|  2|  11|sundance| 
+-----+------+-------+--------+ 
**/ 

而只是櫃面你不相信我的createDataFrame方法,一個數據幀是數據集

dataset.getClass 
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset 

編輯:可能需要的轉換器

一些卡桑德拉類型無效b Spark行的asis,所以你可能需要轉換它們。這可以通過編寫快速轉換功能來完成。不幸的是,SCC使用的內置轉換使內部表示成爲可能,因此我們無法使用這些轉換。

def convertToSpark(element:Any): Any = { 
    case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date 
    case other => other 
} 

使你行

cassandraRow.columnValues.map(convertToSpark):_* 
+0

這是夢幻般的,也解決了一堆我一直有其他問題時,則...太感謝你了!我會明天實施這個,並會讓你知道我是如何得到的:) –

+0

抱歉再次打擾你 - 這似乎是非常接近工作,除了我的卡桑德拉領域之一是日期,我看到的例外'編碼時出錯:java.lang.RuntimeException:org.joda.time.LocalDate不是日期模式的有效外部類型。 Do'u知道這是否有明顯的修復我失蹤?再次感謝 –

+0

哦星火:)問題是cassandra驅動返回的類型「joda的localdate」與Spark不兼容。所以你需要做的就是將這些LocalDate轉換爲spark兼容類型。我建議您使用內置轉換器的連接器,但這些連接器的目標是內部表示,並且也不允許用於外部源。我將在上面的答案中提供一個轉換類型的代碼示例。 – RussS