2016-09-19 128 views
0

數據將是一個線後產生火花數據幀分隔JSON字符串象下面如何從卡夫卡隊列直接從卡夫卡隊列讀取數據

{「頭」:{「平臺」:「大氣壓」,「MSGTYPE 「:」 1" , 「版本」: 「1.0」}, 「詳細信息」:[{ 「BCC」: 「5814」, 「DSRC」: 「A」, 「輔助」: 「5678」},{ 「BCC」 :「5814」,「dsrc」:「A」,「mid」:「0003」},{「bcc」:「5812」,「dsrc」:「A」,「mid」:「0006」}]}

{「header」:{「platform」:「atm」,「msgtype」:「1」,「version」:「1.0」},「details」:[{「bcc」:「5814」,「dsrc 「:」 A」, 「輔助」: 「1234」},{ 「BCC」: 「5814」, 「DSRC」: 「A」, 「中間」: 「0004」},{ 「BCC」: 「5812」, 「dsrc」:「A」,「mid」:「0009」}]}

{「header」:{「platform」:「atm 」, 「信息類型」: 「1」, 「版本」: 「1.0」}, 「詳細信息」:[{ 「BCC」: 「5814」, 「DSRC」: 「A」, 「輔助」: 「1234」}, { 「BCC」: 「5814」, 「DSRC」: 「A」, 「中間」: 「0004」},{ 「BCC」: 「5812」, 「DSRC」: 「A」, 「中間」: 「0009」 }]}

我們如何在python中爲上述輸入創建一個數據框?我有很多列訪問上面只是一個示例,數據總共有23列。任何幫助,將不勝感激。

回答

0

您正在尋找pyspark.sql.SQLContext.jsonRDD。由於Spark流是批量生成的,因此您的流對象將返回一系列RDD,每個RDD都可以通過jsonRDD生成DF。

+0

@佩德羅席爾瓦謝謝!我嘗試運行下面的代碼,但得到錯誤sc = SparkContext() sqlContext = SQLContext(sc) df = sqlContext.jsonRDD(「/ nro0/app/spark/examples/src/main/resources/input.json 「) df.show()AttributeError:'str'對象沒有屬性'mapPartitions' –

+0

'jsonRDD'需要一個'RDD'。如果你想讀一個文件,你需要'sqlContext.read.json'。 –