2017-09-05 124 views
2

我有一些包含JSON對象的文本文件(每行一個對象)。示例:根據某個鍵值(pyspark)從RDD創建多個Spark DataFrame

{"a": 1, "b": 2, "table": "foo"} 
{"c": 3, "d": 4, "table": "bar"} 
{"a": 5, "b": 6, "table": "foo"} 
... 

我想根據表名稱將文本文件的內容解析到Spark DataFrame中。所以在上面的例子中,我將有一個「foo」的DataFrame和「bar」的另一個DataFrame。我已儘可能JSON的線分組爲一個RDD的內部列出與以下(pyspark)代碼:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*")) 
tables_rdd = text_rdd.groupBy(lambda x: json.loads(x)['table']) 

這產生含有元組的列表與以下結構的RDD:

RDD[("foo", ['{"a": 1, "b": 2, "table": "foo"}', ...], 
    ("bar", ['{"c": 3, "d": 4, "table": "bar"}', ...]] 

如何將此RDD分解爲每個表鍵的DataFrame?

編輯:我試圖澄清上面有一個單一的文件中包含多個表中的信息行。我知道我可以在我創建的「groupBy」RDD上調用.collectAsMap,但我知道這會在我的驅動程序上佔用相當數量的RAM。我的問題是:有沒有辦法在不使用.collectAsMap的情況下將「groupBy」RDD分成多個DataFrame?

回答

3

可以有效地將其切分爲拼花分區: 首先,我們將它轉​​換成數據幀:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*")) 
df = spark.read.json(text_rdd) 
df.printSchema() 
    root 
    |-- a: long (nullable = true) 
    |-- b: long (nullable = true) 
    |-- c: long (nullable = true) 
    |-- d: long (nullable = true) 
    |-- table: string (nullable = true) 

現在我們可以把它寫:

df.write.partitionBy('table').parquet([output directory name]) 

如果您列出的內容[output directory name],您將會看到與table不同的值:

hadoop fs -ls [output directory name] 

    _SUCCESS 
    table=bar/ 
    table=foo/ 

如果你想只保留每個表的列,你可以這樣做(假設列的完整列表出現每當表在文件中出現)

import ast 
from pyspark.sql import Row 
table_cols = spark.createDataFrame(text_rdd.map(lambda l: ast.literal_eval(l)).map(lambda l: Row(
     table = l["table"], 
     keys = sorted(l.keys()) 
    ))).distinct().toPandas() 
table_cols = table_cols.set_index("table") 
table_cols.to_dict()["keys"] 

    {u'bar': [u'c', u'd', u'table'], u'foo': [u'a', u'b', u'table']} 
0

步驟如下:

  1. 地圖每個文本字符串JSON。

    jsonRdd = sc.textFile(os.path.join("/path/to/data", "*")).map (.....) 
    
  2. 獲取所有不同的表名給驅動程序。

    tables = jsonRdd.map(<extract table name only from json object >).distinct().collect() 
    
  3. 迭代通過各(步驟2)的表並過濾主要jsonRdd來創建單個表RDD。

    tablesRDD=[] 
    for table in tables: 
        # categorize each main rdd record based on table name. 
        # Compare each json object table element with for loop table string and on successful match return true. 
        output.append(jasonRdd.filter(lambda jsonObj: jsonObj['table'] == table)) 
    

我不是Python開發如此精確的代碼片段可能無法正常工作的。

相關問題