2016-10-12 122 views
0

我使用Spark來並行執行一些數據提取並返回熊貓數據框的一些現有代碼。我想將這些熊貓數據框轉換爲一個或多個Spark數據框。PySpark - 將python數據結構轉換爲執行器上的RDD

Nb。現有的代碼非常複雜(涉及到調用本地庫等),因此將其直接移植到Spark代碼不是一種選擇。

下面的代碼的一個簡單的例子:

import pandas as pd 

def extract_df(s): 
    # Lots of existing code that returns a large pandas dataframe 
    # ... 
    return pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]}) 

sRDD = spark.sparkContext.parallelize(['A', 'B', 'C']) 
dfsRDD = sRDD.map(lambda s: extract_df(s)) 

我知道我可以通過收集對駕駛員datesRDD轉換成星火數據幀。

spark.createDataFrame(pd.concat(rdd.collect(), ignore_index=True)).show() 

但這當然要求我可以將整個Pandas數據框的集合保存在內存中,而我不能。目前,我在S3上將熊貓數據框寫入json,然後使用Spark讀取,但這是使用存儲的批次

有什麼方法可以告訴Spark將其轉換爲執行程序本身的DataFrame/RDD?還是有另一種我錯過的方法?

回答

0

不錯,flatMap來救援!

import pandas as pd 

def extract_df(s): 
    # Lots of existing code that returns a **huge** pandas dataframe 
    # ... 
    df = pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]}) 
    return df.values.tolist() 

datesRDD = spark.sparkContext.parallelize(['A', 'B', 'C']) 

dfsRDD = datesRDD.flatMap(lambda s: extract_df(s)) 

spark.createDataFrame(dfsRDD, schema=['x', 'y', 'z']).show() 

+---+---+---+ 
| x| y| z| 
+---+---+---+ 
| A| 1| 4| 
| A| 2| 5| 
| A| 3| 6| 
| B| 1| 4| 
| B| 2| 5| 
| B| 3| 6| 
| C| 1| 4| 
| C| 2| 5| 
| C| 3| 6| 
+---+---+---+