2017-04-18 18 views
0

更新(04/20/17): 我正在使用Apache Spark 2.1.0,我將使用Python。Python Spark實現map-reduce算法來創建(列,值)元組

我已經縮小了問題範圍,希望有人對Spark有更多的瞭解。我需要從values.csv文件的頭部創建的元組的RDD:

values.csv(主收集的數據,非常大的):

+--------+---+---+---+---+---+----+ 
| ID | 1 | 2 | 3 | 4 | 9 | 11 | 
+--------+---+---+---+---+---+----+ 
|  | | | | | | | 
| abc123 | 1 | 2 | 3 | 1 | 0 | 1 | 
|  | | | | | | | 
| aewe23 | 4 | 5 | 6 | 1 | 0 | 2 | 
|  | | | | | | | 
| ad2123 | 7 | 8 | 9 | 1 | 0 | 3 | 
+--------+---+---+---+---+---+----+ 

輸出(RDD)

+----------+----------+----------+----------+----------+----------+----------+ 
| abc123 | (1;1) | (2;2) | (3;3) | (4;1) | (9;0) | (11;1) | 
|   |   |   |   |   |   |   | 
| aewe23 | (1;4) | (2;5) | (3;6) | (4;1) | (9;0) | (11;2) | 
|   |   |   |   |   |   |   | 
| ad2123 | (1;7) | (2;8) | (3;9) | (4;1) | (9;0) | (11;3) | 
+----------+----------+----------+----------+----------+----------+----------+ 

發生了什麼事我配對的每個值與該格式值的列名:

(column_number, value) 

原始格式(如果你有興趣與它的工作):

id,1,2,3,4,9,11 
abc123,1,2,3,1,0,1 
aewe23,4,5,6,1,0,2 
ad2123,7,8,9,1,0,3 

問題:

的例子values.csv文件只包含幾列,但在實際的文件有成千上萬的專欄。我可以提取標題並將其廣播到分佈式環境中的每個節點,但我不確定這是否是解決問題的最有效方法。是否可以通過並行頭來實現輸出?

回答

1

我想你也可以使用PySpark Dataframe來實現解決方案。但是,我的解決方案尚未達到最佳狀態。我使用split來獲取新的列名和相應的列來執行sum。這取決於你的key_list有多大。如果它太大,這可能無法正常工作,因爲您必須在內存上加載key_list(使用collect)。

import pandas as pd 
import pyspark.sql.functions as func 

# example data 
values = spark.createDataFrame(pd.DataFrame([['abc123', 1, 2, 3, 1, 0, 1], 
              ['aewe23', 4, 5, 6, 1, 0, 2], 
              ['ad2123', 7, 8, 9, 1, 0, 3]], 
              columns=['id', '1', '2', '3','4','9','11'])) 
key_list = spark.createDataFrame(pd.DataFrame([['a', '1'], 
               ['b','2;4'], 
               ['c','3;9;11']], 
               columns=['key','cols'])) 
# use values = spark.read.csv(path_to_csv, header=True) for your data 

key_list_df = key_list.select('key', func.split('cols', ';').alias('col')) 
key_list_rdd = key_list_df.rdd.collect() 
for row in key_list_rdd: 
    values = values.withColumn(row.key, sum(values[c] for c in row.col if c in values.columns)) 
keys = [row.key for row in key_list_rdd] 
output_df = values.select(keys) 

輸出

output_df.show(n=3) 
+---+---+---+ 
| a| b| c| 
+---+---+---+ 
| 1| 3| 4| 
| 4| 6| 8| 
| 7| 9| 12| 
+---+---+---+ 
+0

我不知道這是不是我所此刻在我的代碼更新的更好,因爲你的代碼必須閱讀大量文件到大熊貓據幀,這是沒有分發。我可能是錯的。我更新了我的代碼,以便在RDD中提供解決方案,但是我想知道是否可以改進它,因爲我是Apache Spark的新手,尤其是get_output_row()函數需要傳遞收集的鍵列表版本。 – Dobob

+0

哦,對於閱讀部分,您可以通過直接提供CSV路徑來'spark.read.csv(path_to_csv)'。它會給你PySpark數據幀。 – titipata