如果這僅僅是每個文件一個JSON文檔所有你需要的是SparkContext.wholeTextFiles
。首先,讓我們創建一些虛擬的數據:
import tempfile
import json
input_dir = tempfile.mkdtemp()
docs = [
{'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}},
{'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}},
{'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}]
for doc in docs:
with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw:
json.dump(doc, fw, indent=4)
現在讓我們來讀取數據:
rdd = sc.wholeTextFiles(input_dir).values()
,並確保這些文件是縮進:
print rdd.top(1)[0]
## {
## "data": {
## "text": {
## "de": "Ein Text.",
## "en": "A text."
## }
## }
## }
最後,我們可以解析:
parsed = rdd.map(json.loads)
and check如果一切都按預期:
parsed.takeOrdered(3)
## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}},
## {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}]
如果仍然遇到一些問題,這是最有可能是由於一些畸形的條目。你可以做最簡單的做法是使用flatMap
與定製的包裝丟棄畸形的條目:
rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd)
## org.apache.spark.api.python.PythonException: Traceback (most recent call ...
## ...
## ValueError: Expecting property name: line 1 column 2 (char 1)
,並使用try_seq
纏(這裏定義:What is the equivalent to scala.util.Try in pyspark?)
rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect()
## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}},
## {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]
因此該文件是由多個'json'的每個人都有多行? – Udy
不,它是json文件的文件夾,每個文件的json內部都有一個4的縮進級別.Spark不喜歡它,既不在一個也不在多個文件中。每行一個json - jsonl開箱即用 - 毫無疑問。 – rebeling
你能添加一個文件/行的例子嗎? – Udy