1
爲了後期處理,我需要將批量數據累積一段時間。我正在使用Spark 1.6.3。
我需要積累他們的形式(tag, [[time, value],..]
)。 到目前爲止,我已經試過updateStateByKey
:被添加將批次數據保存在pyspark中
time = [0]
def updateFunc(new_values, last_sum,time):
time[0] += 5
if time == 10:
time = 0
return None
return (last_sum or []) + new_values
data = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, ['t','t1'])) \
.updateStateByKey(lambda x,y :updateFunc(x,y,time))
data.pprint()
數據。但是,嘗試在10秒後刷新數據不起作用。 (我這樣做是錯誤的方式)
此外,我曾嘗試使用window
:
data= lines.flatMap(lambda lime: line.split(' ')\
.map(lambda tag: (tag: ['time', 'value']))\
.window(10, 2)\
.reduceByKey(lambda x,y : y + x)`
但是,這會產生一個一維長的名單。哪個沒用。 任何線索?謝謝。