2015-03-31 31 views
7

我有一堆組合鍵和值的形式。例如,如何通過火花中的多個鍵組合?

tfile.collect() = [(('id1','pd1','t1'),5.0), 
    (('id2','pd2','t2'),6.0), 
    (('id1','pd1','t2'),7.5), 
    (('id1','pd1','t3'),8.1) ] 

我想在此集合,在那裏我可以聚集基於ID [1..1]或PD [1..1]的信息執行SQL一樣操作。我想實現使用vanilla pyspark apis而不是使用SQLContext。 在我目前的實現中,我從一堆文件中讀取併合並RDD。

def readfile(): 
    fr = range(6,23) 
    tfile = sc.union([sc.textFile(basepath+str(f)+".txt") 
         .map(lambda view: set_feature(view,f)) 
         .reduceByKey(lambda a, b: a+b) 
         for f in fr]) 
    return tfile 

我打算創建一個聚合數組作爲一個值。例如,

agg_tfile = [((id1,pd1),[5.0,7.5,8.1])] 

其中5.0,7.5,8.1表示[t1,t2,t3]。我目前,通過使用字典的香草python代碼實現相同。它適用於較小的數據集。但我擔心,因爲這可能無法擴展到更大的數據集。有沒有一種有效的方式使用pispark apis來實現同樣的效果?

+0

而不是使用' union',因爲'wholeTextFiles'調用加載所有文件更高效(如果它存在於PySpark中)。 – 2015-04-09 11:51:03

+0

這是一個Scala [聚合減少多(http://dmtolpeko.com/2015/02/12/multi-column-key-and-value-reduce-a-tuple-in-spark/)和一個Python [聯合減少多個值]( http://stackoverflow.com/questions/30895033/spark-use-reducebykey-instead-of-groupbykey-and-mapbyvalues) – ecoe 2015-11-24 13:38:01

回答

13

我的猜測是你想根據多個字段轉置數據。

一種簡單的方法是連接您要分組的目標字段,並使其成爲配對RDD中的密鑰。例如:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1']) 
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b) 
print rdd.collect() 

然後你會得到轉置的結果。

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')] 
+0

這絕對是解決這個問題的有趣方式。我想到了另一種實現相同的方法。但我想,你的方法可能比我的要快得多。我也分享我自己的解決方案。 – Rahul 2015-04-08 20:09:42

+0

PySpark沒有'groupByKey'嗎? – 2015-04-09 11:49:56

+0

PySpark的方法[groupBykey](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html)。但是,這個問題傾向於根據兩個字段對記錄進行分組,而不是進行諸如「SELECT sum(value)FROM data GROUP BY id,pd'這樣的聚合。所以'groupBykey'可能沒有幫助。 – dapangmao 2015-04-09 18:01:26

2

我分組((ID1,T1),((p1,5.0),(p2,6.0))等等...我的地圖功能。後來,我降低使用map_group它創建一個數組爲[P1,P2,......],並在各自的崗位填補值。

def map_group(pgroup): 
    x = np.zeros(19) 
    x[0] = 1 
    value_list = pgroup[1] 
    for val in value_list: 
     fno = val[0].split('.')[0] 
     x[int(fno)-5] = val[1] 
    return x 

tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \ 
       .reduceByKey(lambda p,q:p+q) \ 
       .map(lambda d: (d[0], map_group(d))) 

這並不覺得自己在計算方面昂貴的解決方案,但適用於現在。