2017-03-08 107 views
1

我已經寫了一個火花的工作。看起來像下面這樣:火花正在洗牌大量數據

public class TestClass { 

public static void main(String[] args){ 
String masterIp = args[0]; 
String appName = args[1]; 
String inputFile = args[2]; 
String output = args[3]; 
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName); 
JavaSparkContext sparkContext = new JavaSparkContext(conf); 
JavaRDD<String> rdd = sparkContext.textFile(inputFile); 
Integer[] keyColumns = new Integer[] {0,1,2}; 
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns); 

Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1); 
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2; 
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2; 

JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() { 
     private static final long serialVersionUID = -6293440291696487370L; 
     @Override 
     public Tuple2<String, Integer> call(String t) throws Exception { 
     String[] record = t.split(","); 
     Integer[] keyColumns = broadcastJob.value(); 
     StringBuilder key = new StringBuilder(); 
     for (int index = 0; index < keyColumns.length; index++) { 
      key.append(record[keyColumns[index]]); 
     } 
     key.append("|id=1"); 
     Integer value = new Integer(record[4]); 
     return new Tuple2<String, Integer>(key.toString(),value); 
     }}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2); 
     pairRDD.saveAsTextFile(output); 
    } 
} 

該程序計算每個鍵的值的總和。 根據我的理解,本地組合器應該在每個節點上運行,並將相同鍵的值相加,然後在少量數據的情況下進行混洗。 但在SparkUI上,它顯示了大量的隨機讀取和隨機寫入(差不多58GB)。 我做錯了什麼? 如何知道本地組合器是否工作?

羣集細節: -
20個節點集羣
具有80GB的硬盤來,8GB RAM,4個核每個節點
Hadoop的2.7.2
火花2.0.2(預生成與 - Hadoop的2.7.x分佈)

輸入文件的詳細信息: -
輸入文件存儲在HDFS
輸入文件大小:400GB
多項紀錄:16129999990
戰績列:字符串(2 char),int,int,String(2 char),int,int,String(2 char),String(2 char),String(2 char) 在火花日誌中,我看到使用localitylevel NODE_LOCAL運行的任務。

enter image description here

+0

你爲什麼要用combineByKey和reduceByKey? ReduceByKey將使用reducer作爲組合器,就像在你的例子中一樣 –

回答

0

讓我們分解這個問題,看看得到什麼。爲了簡化計算,讓我們假設:

  • 記錄總數爲1.6e8
  • 獨特的按鍵數是1E6
  • 分割大小爲128MB(這似乎是與任務在你數UI一致)。

有了這些值,數據將被分成大約3200個分區(在你的情況下爲3125個分區)。這給你每個分區大約51200條記錄。此外,如果每個密鑰的值數量分佈均勻,那麼每個密鑰的平均值應該大約爲160個記錄。

如果數據是隨機分佈的(例如,它沒有按鍵排序),那麼可以預計平均每個分區每個鍵的記錄數將接近1 *。這基本上是地圖邊合併根本不會減少數據量的最壞情況。

此外,您必須記住,平面文件的大小通常會顯着降低序列化對象的大小。

對於現實生活中的數據,您通常可以期望從數據收集過程中出現某種類型的順序,所以事情應該比我們上面計算的好,但底線是,如果數據尚未按分區分組,則地圖側組合可能沒有提供任何改進。

通過使用更大的拆分(256MB會使您每個分區超過100K),您可以減少混洗數據的數量,但它會帶來更長的GC暫停和其他GC問題的代價。


*您可以通過取樣與替代模擬這個:

import pandas as pd 
import numpy as np 

(pd 
    .DataFrame({"x": np.random.choice(np.arange(3200), size=160, replace=True)}) 
    .groupby("x") 
    .x.count() 
    .mean()) 

或只是想想的隨機分配160個球,3200桶的問題。