2017-06-22 35 views
1

我在閱讀databricks博客link 我發現內置函數to_json有問題。 在代碼本教程中自爆,則返回錯誤:to_json不能在selectExpr中使用spark

org.apache.spark.sql.AnalysisException: Undefined function: 'to_json'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.

這是否意味着這種用法在本教程是錯誤的?並且在selectExpr中不能使用udf。我可以做一些像註冊這個to_json函數到默認數據庫嗎?

val deviceAlertQuery = notifydevicesDS 
         .selectExpr("CAST(dcId AS STRING) AS key", "to_json(struct(*)) AS value") 
         .writeStream 
         .format("kafka") 
         .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
         .option("toipic", "device_alerts") 
         .start() 

回答

0

根據我從郵件列表中獲得的信息。這個函數不會從Spark 2.2.0添加到SQL中。這是提交鏈接:commit。 希望這會有所幫助。 THX Hyukjin Kwon和Burak Yavuz。

1

您需要improt的to_json功能

import org.apache.spark.sql.functions.to_json 

這應該工作而不是selectExpr

data.withColumn("key", $"dcId".cast("string")) 
    .select(to_json(struct(data.columns.head, data.columns.tail:_*)).as("value")).show() 

你也必須使用火花2.x的

我希望這個幫助解決你的問題。

+0

我試過但沒有工作。任何想法? –

+0

你是否試圖將所有列更改爲json作爲一列並寫入? –

+0

更新了答案:) –