1

我想從spark-streaming保存到幾個彈性搜索索引。 我創建了<key(index), value>對,當我執行groupByKey時,結果是的Tuple,但爲了使用elasticsearch-spark插件保存到elasticsearch,我需要的值爲JavaRDD<value>將iterable轉換爲RDD

我知道有一個sparkContext.parallelize(list)選項可以從列表創建JavaRDD,但是這隻能在驅動程序上執行。

是否有另一種選擇來創建可以在執行程序上執行的JavaRDD?或者我可以通過另一種方式實現對執行程序有效的Tuple2<key(index), JavaRDD<value>>? 如果不是,我怎麼才能使Iterator只能切換到驅動程序上的JavaRDD,以及插件是否在執行程序中寫入elasticsearch?

感謝,

丹妮拉

+0

埃姆,AFAIK,'groupByKey'產生'JavaPairRDD >',它仍然是'rdd' 。 rdd的任何進一步處理都在執行程序上執行,而不是在驅動程序上執行。 –

回答

1

我會說,它必須是能夠有像水木清華以下

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2()); 
JavaRDD<Value> onlyValues = values.flatMap(it -> it); 

替代辦法是

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1); 
JavaRDD<Value> values = keyValues.map(t2 -> t2._2()); 
+0

感謝葉甫,因爲我需要從JavaPairRDD <鑰匙,可迭代>去JavaRDD 裏面foreachRDD JavaRDD 值的結果= rdd.flatMap((FlatMapFunction >,字符串>)tuple2 - > { final列表 l = Lists.newArrayList(); tuple2._2()。forEach(l :: add); return l; });與相同的密鑰有關? – Daniela

+0

我可能誤解了你的問題。我會編輯我的答案,希望這次會更好。 – evgenii

相關問題