2014-10-28 41 views
0

我想從DStream中刪除前n個RDD。我嘗試使用以下函數與轉換,但它不起作用(錯誤OneForOneStrategy:org.apache.spark.SparkContext java.io.NotSerializableException),我不認爲它會完成我的真正目標,刪除RDDs,因爲它會返回空的。如何從Spark Streaming中的DStream中刪除RDD?

var num = 0 
def dropNrdds(myRDD: RDD[(String, Int)], dropNum: Int) : RDD[(String, Int)] = { 
    if (num < dropNum) { 
     num = num + 1 
     return myRDD 
    } 
    else { 
     return sc.makeRDD(Seq()) 
    } 
} 

回答

1

這個錯誤是因爲你的函數是指你的var num,幷包含類不Serializable。你的函數將被集羣的不同節點調用,所以它依賴的任何東西都必須是Serializable,並且你不能在你的函數的不同調用之間共享一個變量(因爲它們可能在不同的集羣節點上運行)。

考慮到特定的DStream被拆分的方式幾乎是一個實現細節,想要從DStream中刪除特定數量的RDDs似乎很奇怪。也許基於時間的slice方法可以做你想做的事情?

+0

有沒有辦法切掉前n個窗口,但仍然得到流的其餘部分?我看着那個函數,我想我必須指定切片的結束。我的動機是:http://stackoverflow.com/questions/26445407/how-can-i-perform-an-operation-on-two-windowed-dstreams-with-an-offset – Zatricion 2014-10-28 17:32:23

+0

還是有一種方法,我可以保持跟蹤我所做的切片,並隨着事物被添加到輸入流而繼續移動它? – Zatricion 2014-10-29 04:22:52

0

你得到錯誤,因爲,我猜你是從

調用這個函數foreachRdd

循環,這實際上大幹快上executers節點執行,如果要得到執行的東西執行程序節點的代碼必須是Serializable和SparkContext(sc,你在dropNrdds方法中引用它)不是Serializable,因此你會得到那個錯誤。

並來到您的實際問題。

不知道你的要求,但

您可以爲您的RDD並選擇記錄哪些 您的條件相匹配的數據幀。並忽略其餘。

您可以使用過濾器,並創建一個新的RDD帶過濾器的數據。

相關問題