5

我使用的Spark 2.2,我想讀卡夫卡的JSON消息行,它們變換爲DataFrame並將它們作爲一個:這個我jsontostructs在火花結構流

spark 
    .readStream() 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .option("subscribe", "topic") 
    .load() 
    .select(col("value").cast(StringType).as("col")) 
    .writeStream() 
    .format("console") 
    .start(); 

可以實現:

+--------------------+ 
|     col| 
+--------------------+ 
|{"myField":"somet...| 
+--------------------+ 

我想要更多的東西是這樣的:

+--------------------+ 
|    myField| 
+--------------------+ 
|"something"   | 
+--------------------+ 

我試着用struct使用from_json功能:

DataTypes.createStructType(
    new StructField[] { 
      DataTypes.createStructField("myField", DataTypes.StringType) 
    } 
) 

,但我只得到:

+--------------------+ 
| jsontostructs(col)| 
+--------------------+ 
|[something]   | 
+--------------------+ 

然後我試圖使用explode但我只拿到了異常說:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(... 

任何想法如何使這項工作?

回答

4

你幾乎在那裏,只要選擇正確的事情。 from_json返回與該模式匹配的struct列。如果架構(JSON表示)是這樣的:

{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]} 

你會得到嵌套的對象等同於:

root 
|-- jsontostructs(col): struct (nullable = true) 
| |-- myField: string (nullable = false) 

可以使用getField(或getItem)方法來選擇特定領域

df.select(from_json(col("col"), schema).getField("myField").alias("myField")); 

.*選擇struct中的所有頂級字段:

df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*"); 

雖然單stringget_json_object應該是綽綽有餘:

df.select(get_json_object(col("col"), "$.myField"));