2016-02-20 40 views
1

我有一個很大的json行(行)數據集。這些行有幾個字段,並且存在的字段取決於該行中的一個json字段。這裏有一個小例子:根據pyspark中的鍵有效地推斷數據幀架構

%pyspark 
data = sc.parallelize([{'key':'k1','a':1.0,'b':2.0}, 
        {'key':'k1','a':1.0,'b':20.0}, 
        {'key':'k1','a':100.0,'b':.2}, 
        {'key':'k2','y':10.0,'z':20.0}, 
        {'key':'k2','y':1.0,'z':250.0}, 
        {'key':'k1','a':1.0,'b':2.0},], 2) 

我的目標是將這些數據放到Dataframe中,而不必指定模式。 Pyspark至少有兩個功能來幫助解決這個問題:1)toDF(),它只將第一行數據作爲模式,2)sqlContext.createDataFrame(),您可以在其中指定要採樣的行的比例來推斷模式。例如: -

data.toDF().show() 
+-----+----+---+ 
| a| b|key| 
+-----+----+---+ 
| 1.0| 2.0| k1| 
| 1.0|20.0| k1| 
|100.0| 0.2| k1| 
| null|null| k2| 
| null|null| k2| 
| 1.0| 2.0| k1| 
+-----+----+---+ 

sqlContext.createDataFrame(data,samplingRatio=1).show() 
+-----+----+---+----+-----+ 
| a| b|key| y| z| 
+-----+----+---+----+-----+ 
| 1.0| 2.0| k1|null| null| 
| 1.0|20.0| k1|null| null| 
|100.0| 0.2| k1|null| null| 
| null|null| k2|10.0| 20.0| 
| null|null| k2| 1.0|250.0| 
| 1.0| 2.0| k1|null| null| 
+-----+----+---+----+-----+ 

sqlContext.createDataFrame()我想要做什麼,但因爲我只有4個十億行也許五個鍵,我想一定有推斷架構更快的方法。此外,一些鑰匙是非常罕見的,所以我不能放棄讓更小的samplingRatio

有沒有一個優雅而快速的方式來推斷模式,因爲只有幾種行類型?

+1

首先使用字典來推斷架構已被棄用在1.3.0所以它不是一個好的選擇。一般來說,如果你不想自己提供模式,你不可能比完整掃描更好。試想一下 - 即使你檢查N-1個元素,也不能保證第N個元素不會包含額外的字段。 – zero323

+0

有關辭典模式推斷的建議已被棄用。鑑於此,什麼是更好的方法? 我發佈了一個答案,根據'key'鍵的值使用了關於模式唯一性的邊信息,這似乎是使其更快的技巧。 –

+1

如果輸入實際上是JSON,那麼將數據直接傳遞給JSON閱讀器,而無需通過Python進行管道傳輸。或者傳遞模式。它不僅可以作爲性能增強器,而且也可以用於驗證。 – zero323

回答

1

多一點谷歌搜索導致我的解決方案。

開始通過創建一個強大的數據框連接符(unionAll不能合併模式):

def addEmptyColumns(df, colNames): 
    exprs = df.columns + ["null as " + colName for colName in colNames] 
    return df.selectExpr(*exprs) 


def concatTwoDfs(left, right): 
    # append columns from right df to left df 
    missingColumnsLeft = set(right.columns) - set(left.columns) 
    left = addEmptyColumns(left, missingColumnsLeft) 

    # append columns from left df to right df 
    missingColumnsRight = set(left.columns) - set(right.columns) 
    right = addEmptyColumns(right, missingColumnsRight) 

    # let's set the same order of columns 
    right = right[left.columns] 

    # finally, union them 
    return left.unionAll(right) 


def concat(dfs): 
    return reduce(concatTwoDfs, dfs) 

(從https://lab.getbase.com/pandarize-spark-dataframes/代碼)

然後拿到不同的鑰匙,讓dataframes的列表,並連接他們:

keys = data.map(lambda x: x['key']).distinct().collect() 

a_grp = [data.filter(lambda x: x['key']==k).toDF() for k in keys] 

concat(a_grp).show() 

+-----+----+---+----+-----+ 
| a| b|key| y| z| 
+-----+----+---+----+-----+ 
| 1.0| 2.0| k1|null| null| 
| 1.0|20.0| k1|null| null| 
|100.0| 0.2| k1|null| null| 
| 1.0| 2.0| k1|null| null| 
| null|null| k2|10.0| 20.0| 
| null|null| k2| 1.0|250.0| 
+-----+----+---+----+-----+ 
相關問題