2016-11-27 155 views
0

我從NOSQL數據庫創建了一個RDD,我想將RDD轉換爲數據幀。我嘗試了很多選項,但都導致錯誤。RDD到Dataframe Spark Couchbase

val df = sc.couchbaseQuery(test).map(_.value).collect().foreach(println) 


{"accountStatus":"AccountOpen","custId":"140034"} 
{"accountStatus":"AccountOpen","custId":"140385"} 
{"accountStatus":"AccountClosed","subId":"10795","custId":"139698","subStatus":"Active"} 
{"accountStatus":"AccountClosed","subId":"11364","custId":"140925","subStatus":"Paused"} 
{"accountStatus":"AccountOpen","subId":"10413","custId":"138842","subStatus":"Active"} 
{"accountStatus":"AccountOpen","subId":"10414","custId":"138842","subStatus":"Active"} 
{"accountStatus":"AccountClosed","subId":"11314","custId":"140720","subStatus":"Paused"} 
{"accountStatus":"AccountOpen","custId":"139166"} 
{"accountStatus":"AccountClosed","subId":"10735","custId":"139558","subStatus":"Paused"} 
{"accountStatus":"AccountOpen","custId":"139575"} 
df: Unit =() 

我曾嘗試加入.toDF(),以我的代碼的結束,也是創建模式,並使用createDataFrame但收到錯誤。將RDD轉換爲Dataframe的最佳方法是什麼?

import org.apache.spark.sql.types._ 

// The schema is encoded in a string 
val schemaString = "accountStatus subId custId subStatus" 

// Generate the schema based on the string of schema 
val fields = schemaString.split(" ") 
    .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
val schema = StructType(fields) 

//

val peopleDF = spark.createDataFrame(df,schema) 

錯誤

<console>:101: error: overloaded method value createDataFrame with alternatives: 
    (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and> 
    (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and> 
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
cannot be applied to (Unit, org.apache.spark.sql.types.StructType) 
     val peopleDF = spark.createDataFrame(df,schema) 

其他

val df = sc.couchbaseQuery(test).map(_.value).toDF() 

錯誤

<console>:93: error: value toDF is not a member of org.apache.spark.rdd.RDD[com.couchbase.client.java.document.json.JsonObject] 
     val df1 = sc.couchbaseQuery(test).map(_.value).toDF() 
                ^
+0

您收到了哪些錯誤? –

+0

請發佈stackTrace!現在添加了 –

+0

錯誤。謝謝Mark – Mark

回答

0

在第一個示例中,您將val df分配給調用foreach的結果,該調用的類型爲Unit

刪除通話收集和foreach並應工作:

// removed collect().foreach() here: 
val df = sc.couchbaseQuery(test).map(_.value) 
import org.apache.spark.sql.types._ 

// The schema is encoded in a string 
val schemaString = "accountStatus subId custId subStatus" 

// Generate the schema based on the string of schema 
val fields = schemaString.split(" ") 
    .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
val schema = StructType(fields) 
val peopleDF = spark.createDataFrame(df,schema) 

對於第二種方法,我懷疑火花SQL不知道如何處理與供應的JSONObject的couchbase客戶端,所以儘量映射將值賦給String,然後使用Spark sql將rdd讀爲JSON

+0

謝謝@ImDarrenG。我嘗試了以上,但我收到了一個錯誤。錯誤:重載的方法值createDataFrame替代 – Mark

0
Try as below. 

val data = spark.sparkContext 
     .couchbaseQuery(N1qlQuery.simple(q), bucket) 
     .map(_.value.toString()) 

spark.read.json(data) 

Spark infers the schema from the Couchbase JSON string itself. 
相關問題