2017-03-28 98 views
0

我在寫一個Spark作業,它接收來自多個源的數據,過濾不良的輸入行並輸出稍微修改後的輸入版本。該作業有兩個附加要求:Spark累加器混淆

  • 我必須跟蹤每個來源的錯誤輸入行數,以通知這些上游提供商。
  • 我必須支持每個源的輸出限制。

工作看起來很簡單,我使用累加器來跟蹤問題,以跟蹤每個源過濾的行數。但是,當我執行最後的.limit(N)時,我的累加器行爲發生了變化。下面是觸發對單一來源的行爲的一些條紋下來示例代碼:

from pyspark.sql import Row, SparkSession 
from pyspark.sql.types import * 
from random import randint 

def filter_and_transform_parts(rows, filter_int, accum): 
    for r in rows: 
     if r[0] == filter_int: 
      accum.add(1) 
      continue 

     yield r[0], r[1] + 1, r[2] + 1 

def main(): 
    spark= SparkSession \ 
      .builder \ 
      .appName("Test") \ 
      .getOrCreate() 

    sc = spark.sparkContext 
    accum = sc.accumulator(0) 

    # 20 inputs w/ tuple having 4 as first element                                               
    inputs = [(4, randint(1, 10), randint(1, 10)) if x % 5 == 0 else (randint(6, 10), randint(6, 10), randint(6, 10)) for x in xrange(100)] 

    rdd = sc.parallelize(inputs) 
    # filter out tuples where 4 is first element                                               
    rdd = rdd.mapPartitions(lambda r: filter_and_transform_parts(r, 4, accum)) 


    # if not limit, accumulator value is 20                                                
    # if limit and limit_count <= 63, accumulator value is 0                                            
    # if limit and limit_count >= 64, accumulator value is 20                                            
    limit = True 
    limit_count = 63 

    if limit: 
     rdd = rdd.map(lambda r: Row(r[0], r[1], r[2])) 
     df_schema = StructType([StructField("val1", IntegerType(), False), 
           StructField("val2", IntegerType(), False), 
           StructField("val3", IntegerType(), False)]) 
     df = spark.createDataFrame(rdd, schema=df_schema) 
     df = df.limit(limit_count) 
     df.write.mode("overwrite").csv('foo/') 
    else: 
     rdd.saveAsTextFile('foo/') 

    print "Accum value: {}".format(accum.value) 

if __name__ == "__main__": 
    main() 

的問題是,我的蓄能器有時會報告過濾的行數,有時並不取決於限制規定,並投入數爲一個來源。但是,在所有情況下,被過濾的行都不會進入輸出,這意味着過濾器發生了,並且累加器應該有一個值。

如果您可以對此有所瞭解,那會非常有幫助,謝謝!

更新

  • 添加rdd.persist()調用後mapPartitions作出累加器行爲是一致的。

回答

0

其實,limit_count的值是什麼並不重要。

之所以有時Accum value爲0,是因爲你performe 蓄電池轉換(例如:rdd.map,rdd.mapPartitions)。 (:rdd.foreach如)

讓我們做出改變你的代碼一點點:

from pyspark.sql import * 
from random import randint 


def filter_and_transform_parts(rows, filter_int, accum): 
    for r in rows: 
     if r[0] == filter_int: 
      accum.add(1) 


def main(): 
    spark = SparkSession.builder.appName("Test").getOrCreate() 

    sc = spark.sparkContext 
    print(sc.applicationId) 
    accum = sc.accumulator(0) 

    inputs = [(4, x * 10, x * 100) if x % 5 == 0 else (randint(6, 10), x * 10, x * 100) for x in xrange(100)] 
    rdd = sc.parallelize(inputs) 
    rdd.foreachPartition(lambda r: filter_and_transform_parts(r, 4, accum)) 

    limit = True 
    limit_count = 10 or 'whatever' 

    if limit: 
     rdd = rdd.map(lambda r: Row(val1=r[0], val2=r[1], val3=r[2])) 
     df = spark.createDataFrame(rdd) 
     df = df.limit(limit_count) 
     df.write.mode("overwrite").csv('file:///tmp/output') 
    else: 
     rdd.saveAsTextFile('file:///tmp/output') 

    print "Accum value: {}".format(accum.value) 


if __name__ == "__main__": 
    main() 

總積累值

,在累的作品,以及內部 行動

星火只擔保一直等於20

欲瞭解更多信息:

http://spark.apache.org/docs/2.0.2/programming-guide.html#accumulators

+0

感謝您的鏈接到相關的文檔。儘管如此,我仍然有點困惑,你是否發現以下陳述是正確的? saveAsTextFile是一個動作,並觸發我的上游累加器正確填充。 df.write不是一個動作,也不會對累加器作任何保證(這會導致它根據極限有時填充的情況) – cbrown