2017-06-12 34 views
0

我有一個Spark流,其中記錄流入。間隔大小爲1秒。 我想聯合流中的所有數據。因此,我創建了一個空的RDD,然後使用變換方法,將RDD(在流中)與此空RDD結合起來。 我期待這個空的RDD在最後有所有的數據。 但是,此RDD始終保持空白。聯合未發生Spark轉換

另外,有人可以告訴我,如果我的邏輯是正確的。

JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD(); 
JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record)) 
        .transform(rdd -> rdd.union(records)); 

transformedMessages.foreachRDD(record -> { 
System.out.println("Aman" +record.count()); 
StructType schema = DataTypes.createStructType(fields); 

Dataset ds = ss.createDataFrame(records, schema); 
ds.createOrReplaceTempView("tempTable"); 
ds.show(); 

}); 

回答

0

最初,records爲空。

然後我們有transformedMessages = messages + records,但records是空的,所以我們有:transformedMessages = messages後來(避免了flatmap函數,而不是相關的討論)

,當我們做Dataset ds = ss.createDataFrame(records, schema);records 仍然是空的。這在程序流程中並沒有改變,所以它隨着時間的推移將保持爲空。

我想我們要做的是什麼,而不是

.transform(rdd -> rdd.union(records)); 

我們應該做的:

.foreachRDD{rdd => records = rdd.union(records)} //Scala: translate to Java syntax 

不過,請注意,這個過程反覆增加了血統的'記錄'RDD,並且還會隨着時間的推移累積所有數據。這不是一項可以長時間穩定運行的工作,因爲最終如果有足夠的數據,它會超出系統的限制。

沒有關於這個問題背後的用例的信息,但目前的方法似乎沒有可擴展性和可持續性。

+0

謝謝,我明白你的觀點。 但是那麼變換方法有什麼用?我已經閱讀了各種文件,其中說它使用Transform方法來合併RDD數據。 –

+0

另外,我的目的是在某些時間創建DataSet以外的「記錄」RDD。我正在考慮使用「forEach」這個代碼(創建DataSet),但現在我在這裏聯合RDD,我應該在哪裏放置我的代碼來創建DataSet –

+0

@AmanpreetKhurana你可以給這個問題添加一些背景嗎?你想解決什麼問題?在我看來,目前的做法不會隨着時間的推移。 – maasg