2017-05-16 83 views
0

嗨,我是Pyspark Streaming的新手。Pyspark流式轉換錯誤

numbers0 = sc.parallelize([1,2,3,4,5]) 

numbers1 = sc.parallelize([2,3,4,5,6]) 

numbers2 = sc.parallelize([3,4,5,6,7]) 

stream0 = ssc.queueStream([numbers0, numbers1, numbers2]) 

stream0.pprint() 

ssc.start() 
ssc.awaitTermination(20) 
ssc.stop() 

這工作得很好,但只要我做了以下我得到一個錯誤:

stream1 = stream0.transform(lambda x: x.mean()) 

stream1.pprint() 

ssc.start() 
ssc.awaitTermination(20) 
ssc.stop() 

我要的是數據流只由我以前流的平均值。 有誰知道我必須做什麼?

+0

有什麼錯誤? –

+0

通過這種方式使用變換,您將有3個條目是每個RDD的手段。 –

回答

0

調用變換時出現的錯誤是因爲它需要一個RDD-RDD函數,如Spark's documentation for the transform operation中所述。當在RDD上調用平均值時,它不會返回新的RDD並因此返回錯誤。

現在,根據我的理解,您要計算由DStream組成的每個RDD的平均值。 DStream是使用queueStream創建的,並且由於命名參數oneAtATime保留爲默認值,因此您的程序將在每個批處理間隔使用一個RDD。

計算平均每個RDD,你通常會做這樣的forEachRDD輸出操作中這樣

# Create stream0 as you do in your example 

def calculate_mean(rdd): 
    mean_value = rdd.mean() 
    # do other stuff with mean_value like saving it to a database or just print it 

stream0.forEachRDD(calculate_mean) 

# Start and stop the Streaming Context