2015-11-03 45 views
4

Wanne將一個帶有縮進的json文件讀入RDD,但spark會引發異常。Pyspark textFile json with indentation

# txts = sc.textFile('data/jsons_without_indentation') # works 
txts = sc.textFile('data/jsons_with_indentation')  # fails 
txts_dicts = txts.map(lambda data: json.loads(data)) 
txts_dicts.collect() 

sc.wholeTextFiles也不起作用。是否有可能加載一個JSON縮進沒有首先變成一個文件?

示例JSON文件看起來是這樣的:

{ 
    "data": { 
     "text": { 
      "de": "Ein Text.", 
      "en": "A text." 
     } 
    } 
} 
+1

因此該文件是由多個'json'的每個人都有多行? – Udy

+0

不,它是json文件的文件夾,每個文件的json內部都有一個4的縮進級別.Spark不喜歡它,既不在一個也不在多個文件中。每行一個json - jsonl開箱即用 - 毫無疑問。 – rebeling

+1

你能添加一個文件/行的例子嗎? – Udy

回答

7

如果這僅僅是每個文件一個JSON文檔所有你需要的是SparkContext.wholeTextFiles。首先,讓我們創建一些虛擬的數據:

import tempfile 
import json 

input_dir = tempfile.mkdtemp() 

docs = [ 
    {'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}}, 
    {'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}}, 
    {'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}] 

for doc in docs: 
    with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw: 
     json.dump(doc, fw, indent=4) 

現在讓我們來讀取數據:

rdd = sc.wholeTextFiles(input_dir).values() 

,並確保這些文件是縮進:

print rdd.top(1)[0] 

## { 
##  "data": { 
##   "text": { 
##    "de": "Ein Text.", 
##    "en": "A text." 
##   } 
##  } 
## } 

最後,我們可以解析:

parsed = rdd.map(json.loads) 

and check如果一切都按預期:

parsed.takeOrdered(3) 

## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}, 
## {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}}, 
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}] 

如果仍然遇到一些問題,這是最有可能是由於一些畸形的條目。你可以做最簡單的做法是使用flatMap與定製的包裝丟棄畸形的條目:

rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd) 

## org.apache.spark.api.python.PythonException: Traceback (most recent call ... 
##  ... 
## ValueError: Expecting property name: line 1 column 2 (char 1) 

,並使用try_seq纏(這裏定義:What is the equivalent to scala.util.Try in pyspark?

rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect() 

## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}}, 
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}, 
## {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]