2017-07-18 19 views
1

有沒有辦法將行轉換爲JSON在foreachPartition中? 我查看過How to convert Row to json in Spark 2 Scala。 但是,這種方法不會工作,因爲我不能從foreachPartition中訪問sqlContext,而且我的數據也包含嵌套類型。如何將行轉換爲foreachPartition中的JSON?

dataframe.foreachPartition { partitionOfRecords => 

    .. 
    val connectionString: ConnectionStringBuilder = new ConnectionStringBuilder(
       eventHubsNamespace, 
       eventHubName, 
       policyName, 
       policyKey) 

    val eventHubsClient: EventHubClient = EventHubClient.createFromConnectionString(connectionString.toString()).get() 

    val json = /* CONVERT partitionOfRecords to JSON */ 

    val bytes = json.getBytes() 
    val eventData = new EventData(bytes) 
    eventHubsClient.send(eventData) 
    } 

回答

1

我強烈推薦前foreachPartition做轉換爲JSON

原因是在functions對象中存在對JSON的內置支持,您可以使用該對象使用to_json函數(不返回相當複雜的編碼)來構建「字符串化」的JSON。

to_json(E:柱):柱含有StructTypeStructTypesArrayType成JSON字符串是否以指定模式的列轉換。

我建議你做以下幾點:

dataframe. 
    select(to_json($"your-struct-column-here")). 
    as[String]. 
    foreachPartition { json: String => ... }