2016-07-05 53 views
0

我有很多json文件,但是它們沒有正確格式化爲Spark。我不想編寫代碼,通過對每行中的每個字典進行規範化來專門將它們轉換爲正確的格式。合併來自不良JSON的Spark RDD

相反,我希望使用火花來解析他們的內容。我有以下

import json 

import os 

json_dir = '/data/original/TEMP' 
df = sc.wholeTextFiles(os.path.join(json_dir,'*.json')) 
j_docs = df.map(lambda x: json.loads(x[1])).cache() 

這工作正常,j_docs本質上是一個列表的列表。例如,j_docs中的第一項是來自第一個文件的字典列表。

我想將所有這些單獨的列表合併爲一個大的RDD。理想情況下,無需運行數據收集。

感謝

+1

使用flatMap而不是地圖? – C4stor

+0

是男人!謝謝。 – browskie

回答

1

,而不是使用映射到flatMap也正是:)

+0

謝謝,我可以不收集嗎?如果我嘗試,它會給我一個這樣的結果:'PipelinedRDD'對象不可迭代 – browskie

+0

它給出了什麼?後來我想呢? – C4stor

+0

這個人在flatMap merged = sc.parallelize(j_docs.flatMap(lambda x:x))之後沒有collect()返回它 – browskie