0

我試圖用Kafka和Python來引發結構化的流媒體。 要求:我需要在Spark中處理來自Kafka(採用JSON格式)的流數據(執行轉換),然後將其存儲在數據庫中。用python構造的Spark結構

我有JSON格式,如數據, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

我打算使用spark.readStream從卡夫卡喜歡讀書,

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() 

我提到this link供參考,但沒有得到如何解析JSON數據。我試過這個,

data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)] 

但看起來不起作用。

任何人誰已經與python火花結構化流工作指導我進行示例或鏈接?

使用,

schema = StructType([ 
    StructField("a", DoubleType()), 
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())]) 

inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() 
data = inData.select(from_json(col("value").cast("string"), schema)) 
query = data.writeStream.outputMode("Append").format("console").start() 

程序運行,但我作爲控制檯獲取值,

+-----------------------------------+ 
|jsontostruct(CAST(value AS STRING))| 
+-----------------------------------+ 
|    [null,null,null,2...| 
|    [null,null,null,2...| 
+-----------------------------------+ 

17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: { 
    "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e", 
    "runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b", 
    "name" : null, 
    "timestamp" : "2017-04-07T19:23:15.013Z", 
    "numInputRows" : 2, 
    "inputRowsPerSecond" : 125.0, 
    "processedRowsPerSecond" : 12.269938650306749, 
    "durationMs" : { 
    "addBatch" : 112, 
    "getBatch" : 8, 
    "getOffset" : 2, 
    "queryPlanning" : 4, 
    "triggerExecution" : 163, 
    "walCommit" : 26 
    }, 
    "eventTime" : { 
    "watermark" : "1970-01-01T00:00:00.000Z" 
    }, 
    "stateOperators" : [ ], 
    "sources" : [ { 
    "description" : "KafkaSource[Subscribe[test]]", 
    "startOffset" : { 
     "test" : { 
     "0" : 366 
     } 
    }, 
    "endOffset" : { 
     "test" : { 
     "0" : 368 
     } 
    }, 
    "numInputRows" : 2, 
    "inputRowsPerSecond" : 125.0, 
    "processedRowsPerSecond" : 12.269938650306749 
    } ], 
    "sink" : { 
    "description" : "[email protected]" 
    } 
} 

難道我錯過這裏的東西。後根據您的需要

from pyspark.sql.functions import get_json_object 

data.select([ 
    get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c) 
    for c in ["a", "b", "name", "time"]]) 

cast他們:

回答

0

您可以使用from_json與架構:

from pyspark.sql.functions import col, from_json 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("a", DoubleType()), 
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())]) 

data.select(from_json(col("value").cast("string"), schema)) 

或獲得單個字段作爲字符串與get_json_object