2017-10-12 21 views
0

我有一個像反向一對多轉型星火

a1<tab>b1,b2,b3,b4 
a2<tab>b1,b2,b3 
........ 
........ 
........ 
aN<tab>bX,bY,bZ 

我想的一樣變換成相反的方式即

b1 -> a1 
    b1 -> aY 
.... 
.... 
    b2 -> aX 
    b2 -> aY 
    b2 -> aZ 
.... 
.... 
    bN -> a1 
    bN -> aY 

如何實現無堵塞堆空間相同的數據集呢?

我已經使用mapPartition做分區智能轉換,以避免洗牌,然後使用distcp按鍵合併。但是,當一個鍵的數值數量很高時,火花作業似乎失敗。

相關的代碼片斷如下:

val res_rdd=rdd.mapPartitions{ 
     iterator => { 
        iterator.toList 
         .map(f => (f.split("\t")(1).split(","),f.split("\t")(0))).flatMap(k => k._1.map(y=> (y,k._2))) 
         .iterator 
        } 
    } 


    import sqlContext.implicits._ 
    val df=res_rdd.toDF("newKey","newValue") 
    df.write.partitionBy("newKey").text(outputPath) 

最終的結果需要每 「則newkey」 文件中有所有 「NEWVALUE」 S。

回答

2

請不要轉換爲List iterator.toList

沒有理由把一切都在內存中。

有來自mapPartitions沒有收穫,這將是更好地使用Dataset一路:由「則newkey」

import org.apache.spark.sql.functions._ 

spark.read.option("delimiter", "\t").csv(path).toDF("key", "value") 
    .withColumn("value", explode(split(col("value"), ","))) 
+0

其實我有組值也是如此。基本上,最後,我需要在每行「NewKey」中包含所有「newValue」的文件。沒有mapPartitions的 ,洗牌非常高。 – taransaini43

+0

也是,因爲我試圖像你提到的那樣,同時將結果數據框寫爲 df.write.partitionBy(「value」).text(outputPath),許多任務在一段時間後失敗,並且嘗試重新嘗試時已給文件由於有些數據已經寫入,因此寫入。 – taransaini43

+0

如何有效地寫入已轉換的數據幀而不會失敗,因爲轉換部分沒問題。 – taransaini43