2016-12-01 46 views
1

爲了後期處理,我需要將批量數據累積一段時間。我正在使用Spark 1.6.3。
我需要積累他們的形式(tag, [[time, value],..])。 到目前爲止,我已經試過updateStateByKey:被添加將批次數據保存在pyspark中

time = [0] 
def updateFunc(new_values, last_sum,time): 
    time[0] += 5 
    if time == 10: 
     time = 0 
     return None 
    return (last_sum or []) + new_values 

data = lines.flatMap(lambda line: line.split(" "))\ 
        .map(lambda word: (word, ['t','t1'])) \ 
        .updateStateByKey(lambda x,y :updateFunc(x,y,time)) 
data.pprint() 

數據。但是,嘗試在10秒後刷新數據不起作用。 (我這樣做是錯誤的方式)

此外,我曾嘗試使用window

data= lines.flatMap(lambda lime: line.split(' ')\ 
    .map(lambda tag: (tag: ['time', 'value']))\ 
    .window(10, 2)\ 
    .reduceByKey(lambda x,y : y + x)` 


但是,這會產生一個一維長的名單。哪個沒用。 任何線索?謝謝。

回答

0
items = lines.flatMap(lambda x: list(x)).map(lambda x: (x, [('time', 'value')])) 
counts = items.reduceByKeyAndWindow(lambda x, y: x + y, invFunc=None, windowDuration=3, slideDuration=2) 

試試這個