2015-06-21 140 views
3

我寫了一些火花代碼,我有一個RDD它看起來像呼籲「pyspark.resultiterable.ResultIterable」

[(4, <pyspark.resultiterable.ResultIterable at 0x9d32a4c>), 
(1, <pyspark.resultiterable.ResultIterable at 0x9d32cac>), 
(5, <pyspark.resultiterable.ResultIterable at 0x9d32bac>), 
(2, <pyspark.resultiterable.ResultIterable at 0x9d32acc>)] 

我需要做的是調用不同的pyspark.resultiterable.ResultIterable

鮮明

我想這

def distinctHost(a, b): 
    p = sc.parallelize(b) 
    return (a, p.distinct()) 

mydata.map(lambda x: distinctHost(*x)) 

但我得到一個錯誤:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

錯誤是不言自明的,我不能使用sc。但我需要找到一種方法來覆蓋pyspark.resultiterableResultIterable添加到RDD,以便我可以調用截然不同的方法。

回答

2

直接的方法是使用集:

from numpy.random import choice, seed 
seed(323) 

keys = (4, 1, 5, 2) 
hosts = [ 
    u'in24.inetnebr.com', 
    u'ix-esc-ca2-07.ix.netcom.com', 
    u'uplherc.upl.com', 
    u'slppp6.intermind.net', 
    u'piweba4y.prodigy.com' 
] 

pairs = sc.parallelize(zip(choice(keys, 20), choice(hosts, 20))).groupByKey() 
pairs.map(lambda (k, v): (k, set(v))).take(3) 

結果:

[(1, {u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net'}), 
(2, 
    {u'in24.inetnebr.com', 
    u'ix-esc-ca2-07.ix.netcom.com', 
    u'slppp6.intermind.net', 
    u'uplherc.upl.com'}), 
(4, {u'in24.inetnebr.com', u'piweba4y.prodigy.com', u'uplherc.upl.com'})] 

如果使用rdd.disinct你可以嘗試這樣一個特別的原因:

def distinctHost(pairs, key): 
    return (pairs 
     .filter(lambda (k, v): k == key) 
     .flatMap(lambda (k, v): v) 
     .distinct()) 

[(key, distinctHost(pairs, key).collect()) for key in pairs.keys().collect()] 
+0

我認爲集會做得很好。謝謝! –