我遇到了Spark Scala腳本的一個小問題。基本上我有原始數據,我正在做分組和計數等聚合後,我想將輸出保存爲特定的JSON格式。Spark Dataframe架構定義使用帶案例類和列名別名的反射
編輯:
我試圖簡化的問題,並改寫它:
當我選擇與Array[org.apache.spark.sql.Column]
其中的列名有別名,然後使用列名從源數據幀的數據(或當試圖將行映射到case類時,我得到一個「Task not serializable」異常。
var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")
val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF
,第二部分或問題,我實際需要的最終模式是這些Result
類對象的數組。我還沒有想出,如何做到這一點。預期的結果應該有一個這樣的模式:
case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))
val t2 = sc.parallelize(Seq(t)).toDF
scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
TL; DR:
如何數據幀行映射到一個案例類對象時,數據框列有別名和變量用於列名?
如何將這些case類對象添加到數組?
序列化問題沒有重現 - 我複製了所有的代碼,它適用於我。看起來像代碼中的某個地方(不粘貼在這裏?),您正在使用DataFrame中使用的Case類中的org.apache.spark.sql.Column對象,或者在序列化併發送給工作人員的轉換中使用... –
順便說一句 - 我們中的一個可能會迷失在這個非常複雜的問題的細節中......嘗試最小化它(很多) - 找到再現問題的最簡單的例子(在類似的最小化後分別詢問另一個問題) –
one修復嘗試你的序列化問題...... class Result(???)extends Serializable;對象結果{def apply(r:Row):結果= r匹配{??? }}然後在r上使用模式匹配來處理你在DF中可能有的各種格式。當你試圖將一個類應用到行的一部分時,通常會遇到問題,但是如果創建了一個可以映射整行的類......那麼'DF.map(Result)'可能會起作用。 – kmh