我有一個Spark流,其中記錄流入。間隔大小爲1秒。 我想聯合流中的所有數據。因此,我創建了一個空的RDD,然後使用變換方法,將RDD(在流中)與此空RDD結合起來。 我期待這個空的RDD在最後有所有的數據。 但是,此RDD始終保持空白。聯合未發生Spark轉換
另外,有人可以告訴我,如果我的邏輯是正確的。
JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD();
JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record))
.transform(rdd -> rdd.union(records));
transformedMessages.foreachRDD(record -> {
System.out.println("Aman" +record.count());
StructType schema = DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(records, schema);
ds.createOrReplaceTempView("tempTable");
ds.show();
});
謝謝,我明白你的觀點。 但是那麼變換方法有什麼用?我已經閱讀了各種文件,其中說它使用Transform方法來合併RDD數據。 –
另外,我的目的是在某些時間創建DataSet以外的「記錄」RDD。我正在考慮使用「forEach」這個代碼(創建DataSet),但現在我在這裏聯合RDD,我應該在哪裏放置我的代碼來創建DataSet –
@AmanpreetKhurana你可以給這個問題添加一些背景嗎?你想解決什麼問題?在我看來,目前的做法不會隨着時間的推移。 – maasg