6

我想使用基於DataFrame/Dataset API的Spark-Streaming使用structured streaming approach從Kafka加載數據流。如何使用結構化流從Kafka讀取JSON格式的記錄?

我使用:

  • 火花2.10
  • 卡夫卡0.10
  • 火花-SQL卡夫卡-0-10

火花卡夫卡數據源定義了底層模式:

|key|value|topic|partition|offset|timestamp|timestampType| 

我的數據以json格式並且它們存儲在的值列中。我正在尋找一種方法,如何從值列中提取底層模式並將接收到的數據幀更新爲存儲在中的列?我嘗試下面的方法,但它不工作:

val columns = Array("column1", "column2") // column names 
val rawKafkaDF = sparkSession.sqlContext.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers","localhost:9092") 
    .option("subscribe",topic) 
    .load() 
    val columnsToSelect = columns.map(x => new Column("value." + x)) 
    val kafkaDF = rawKafkaDF.select(columnsToSelect:_*) 

    // some analytics using stream dataframe kafkaDF 

    val query = kafkaDF.writeStream.format("console").start() 
    query.awaitTermination() 

在這裏我得到異常org.apache.spark.sql.AnalysisException: Can't extract value from value#337;,因爲在創建流的時候,裏面的值是不知道......

你有什麼建議?

回答

6

從Spark的角度來看value只是一個字節序列。它沒有關於序列化格式或內容的知識。爲了能夠提取字段,你必須先解析它。

如果數據序列化爲JSON字符串,則有兩個選項。使用get_json_object由路徑提取物領域

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.from_json 

val schema: StructType = StructType(Seq(
    StructField("column1", ???), 
    StructField("column2", ???) 
)) 

rawKafkaDF.select(from_json($"value".cast(StringType), schema)) 

castStringType:您可以castvalueStringType和使用from_json,並提供一個架構

import org.apache.spark.sql.functions.get_json_object 

val columns: Seq[String] = ??? 

val exprs = columns.map(c => get_json_object($"value", s"$$.$c")) 

rawKafkaDF.select(exprs: _*) 

cast後來到所需的類型。

相關問題