2015-06-17 58 views
4

我在一個類似用途的情況下使用transform方法在描述變換運算部分的Transformations on DStreams如何使用變換操作和外部RDD過濾dstream?

spamInfoRDD = sc.pickleFile(...) # RDD containing spam information 
# join data stream with spam information to do data cleaning 
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...)) 

我的代碼如下:

sc = SparkContext("local[4]", "myapp") 
ssc = StreamingContext(sc, 5) 
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/') 
lines = ssc.socketTextStream("localhost", 9999) 
counts = lines.flatMap(lambda line: line.split(" "))\ 
       .map(lambda word: (word, 1))\ 
       .reduceByKey(lambda a, b: a+b) 
filter_rdd = sc.parallelize([(u'A', 1), (u'B', 1)], 2) 
filtered_count = counts.transform(
    lambda rdd: rdd.join(filter_rdd).filter(lambda k, (v1, v2): v1 and not v2) 
) 
filtered_count.pprint() 
ssc.start() 
ssc.awaitTermination() 

但我得到以下錯誤

看來您正試圖廣播RDD或從ac引用RDD重刑或轉型。 RDD轉換和操作只能由驅動程序調用,而不能在其他轉換中調用;例如,rdd1.map(lambda x:rdd2.values.count()* x)無效,因爲值轉換和計數操作不能在rdd1.map轉換中執行。有關更多信息,請參閱SPARK-5063。

我該如何使用外部RDD過濾dstream中的元素?

+0

你得到了這個答案 – Bg1850

回答

3

Spark doc示例與您的代碼之間的區別在於使用了ssc.checkpoint()。

儘管您提供的具體代碼示例將在沒有檢查點的情況下運行,但我想您確實需要它。但將外部RDD引入檢查點DStream的範圍的概念可能無效:從檢查點恢復時,外部RDD可能已更改。

我試圖檢查點外部的RDD,但我也沒有運氣。

相關問題