2017-06-23 61 views
0

添加在火花文檔中提供的示例代碼,我們有以下幾點:流到累加器

>>> from pyspark.accumulators import AccumulatorParam 
>>> class VectorAccumulatorParam(AccumulatorParam): 
...  def zero(self, value): 
...   return [0.0] * len(value) 
...  def addInPlace(self, val1, val2): 
...   for i in xrange(len(val1)): 
...    val1[i] += val2[i] 
...   return val1 
>>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam()) 
>>> va.value 
[1.0, 2.0, 3.0] 
>>> def g(x): 
...  global va 
...  va += [x] * 3 
>>> rdd.foreach(g) 
>>> va.value 
[7.0, 8.0, 9.0] 

但是,如果我是創造一個流和從該流中添加輸入到累加器,我會怎麼做呢?

看來你只能添加列表或沒有數組到累加器而不是dstreams。

回答

1

只需撥打foreachRDDva.add()transform,如:

sc = SparkContext() 
ssc = StreamingContext(sc, 3) 


class VectorAccumulatorParam(AccumulatorParam): 
    def zero(self, value): 
     return [0] * len(value) 

    def addInPlace(self, val1, val2): 
     for i in xrange(len(val1)): 
      val1[i] += val2[i] 
     return val1 


va = sc.accumulator([0] * 3, VectorAccumulatorParam()) 

data = range(30) 
lines = ssc.queueStream(map(lambda x: sc.parallelize([x]), zip(*[iter(data)] * 3))) 

# lines.transform(lambda rdd: rdd.foreach(lambda x: va.add(x)) or rdd) \ 
#  .foreachRDD(lambda x: print("Now Accumulator Value is ({0})".format(va.value))) 
lines.transform(lambda rdd: print("Now Accumulator Value is ({0})".format(va.value)) or rdd) \ 
    .foreachRDD(lambda rdd: rdd.foreach(lambda x: va.add(x))) 

lines.pprint() 

ssc.start() 
ssc.awaitTermination() 

輸出:

Now Accumulator Value is ([0, 0, 0]) 
-------------------------------------------          
Time: 2017-07-13 17:23:12 
------------------------------------------- 
(0, 1, 2) 

Now Accumulator Value is ([0, 1, 2]) 
------------------------------------------- 
Time: 2017-07-13 17:23:15 
------------------------------------------- 
(3, 4, 5) 

Now Accumulator Value is ([3, 5, 7]) 
------------------------------------------- 
Time: 2017-07-13 17:23:18 
------------------------------------------- 
(6, 7, 8) 

Now Accumulator Value is ([9, 12, 15]) 
------------------------------------------- 
Time: 2017-07-13 17:23:21 
------------------------------------------- 
(9, 10, 11) 

Now Accumulator Value is ([18, 22, 26]) 
-------------------------------------------          
Time: 2017-07-13 17:23:24 
-------------------------------------------