0
我在寫一個Spark作業,它接收來自多個源的數據,過濾不良的輸入行並輸出稍微修改後的輸入版本。該作業有兩個附加要求:Spark累加器混淆
- 我必須跟蹤每個來源的錯誤輸入行數,以通知這些上游提供商。
- 我必須支持每個源的輸出限制。
工作看起來很簡單,我使用累加器來跟蹤問題,以跟蹤每個源過濾的行數。但是,當我執行最後的.limit(N)
時,我的累加器行爲發生了變化。下面是觸發對單一來源的行爲的一些條紋下來示例代碼:
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from random import randint
def filter_and_transform_parts(rows, filter_int, accum):
for r in rows:
if r[0] == filter_int:
accum.add(1)
continue
yield r[0], r[1] + 1, r[2] + 1
def main():
spark= SparkSession \
.builder \
.appName("Test") \
.getOrCreate()
sc = spark.sparkContext
accum = sc.accumulator(0)
# 20 inputs w/ tuple having 4 as first element
inputs = [(4, randint(1, 10), randint(1, 10)) if x % 5 == 0 else (randint(6, 10), randint(6, 10), randint(6, 10)) for x in xrange(100)]
rdd = sc.parallelize(inputs)
# filter out tuples where 4 is first element
rdd = rdd.mapPartitions(lambda r: filter_and_transform_parts(r, 4, accum))
# if not limit, accumulator value is 20
# if limit and limit_count <= 63, accumulator value is 0
# if limit and limit_count >= 64, accumulator value is 20
limit = True
limit_count = 63
if limit:
rdd = rdd.map(lambda r: Row(r[0], r[1], r[2]))
df_schema = StructType([StructField("val1", IntegerType(), False),
StructField("val2", IntegerType(), False),
StructField("val3", IntegerType(), False)])
df = spark.createDataFrame(rdd, schema=df_schema)
df = df.limit(limit_count)
df.write.mode("overwrite").csv('foo/')
else:
rdd.saveAsTextFile('foo/')
print "Accum value: {}".format(accum.value)
if __name__ == "__main__":
main()
的問題是,我的蓄能器有時會報告過濾的行數,有時並不取決於限制規定,並投入數爲一個來源。但是,在所有情況下,被過濾的行都不會進入輸出,這意味着過濾器發生了,並且累加器應該有一個值。
如果您可以對此有所瞭解,那會非常有幫助,謝謝!
更新:
- 添加
rdd.persist()
調用後mapPartitions
作出累加器行爲是一致的。
感謝您的鏈接到相關的文檔。儘管如此,我仍然有點困惑,你是否發現以下陳述是正確的? saveAsTextFile是一個動作,並觸發我的上游累加器正確填充。 df.write不是一個動作,也不會對累加器作任何保證(這會導致它根據極限有時填充的情況) – cbrown