2017-04-08 61 views
0

有可能使用updateStateByKey()函數與元組作爲值嗎?我使用PySpark,我的輸入是(word, (count, tweet_id)),這意味着word是一個關鍵字,而一個元組(count, tweet_id)是一個值。 updateStateByKey的任務是爲每個單詞總結他們的計數並創建包含該單詞的所有tweet_id的列表。Spark Streaming updateStateByKey與元組作爲值

我實現了以下更新功能,但我得到了錯誤列表索引超出範圍new_values索引1:

def updateFunc(new_values, last_sum): 
    count = 0 
    tweets_id = [] 
    if last_sum: 
    count = last_sum[0] 
    tweets_id = last_sum[1] 
    return sum(new_values[0]) + count, tweets_id.extend(new_values[1]) 

並調用方法:

running_counts.updateStateByKey(updateFunc) 
+0

可以分享pyspark代碼,我可以自己試試這樣一個例子。想知道爲什麼-1被給出 – thebluephantom

+0

我在這個項目中遇到了這個問題https://github.com/dmacjam/twitter-word-cloud/blob/master/processing/trending_words.py –

+0

thx,會嘗試並擺脫,如果減1 – thebluephantom

回答

1

我已經找到了解決辦法。問題出在checkpointing,這意味着當前狀態在發生故障時會保存到磁盤。它造成了一些問題,因爲當我改變了我的狀態定義時,在檢查點它處於沒有元組的舊狀態。因此,我從磁盤中刪除檢查點並實現最終的解決方案爲:

def updateFunc(new_values, last_sum): 
    count = 0 
    counts = [field[0] for field in new_values] 
    ids = [field[1] for field in new_values] 
    if last_sum: 
    count = last_sum[0] 
    new_ids = last_sum[1] + ids 
    else: 
    new_ids = ids 
    return sum(counts) + count, new_ids 

最後,回答我的問題是:是的,狀態可以是一個元組或用於存儲更多值任何其它數據類型。