我已經寫了一個火花的工作。看起來像下面這樣:火花正在洗牌大量數據
public class TestClass {
public static void main(String[] args){
String masterIp = args[0];
String appName = args[1];
String inputFile = args[2];
String output = args[3];
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<String> rdd = sparkContext.textFile(inputFile);
Integer[] keyColumns = new Integer[] {0,1,2};
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns);
Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1);
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2;
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2;
JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = -6293440291696487370L;
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] record = t.split(",");
Integer[] keyColumns = broadcastJob.value();
StringBuilder key = new StringBuilder();
for (int index = 0; index < keyColumns.length; index++) {
key.append(record[keyColumns[index]]);
}
key.append("|id=1");
Integer value = new Integer(record[4]);
return new Tuple2<String, Integer>(key.toString(),value);
}}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2);
pairRDD.saveAsTextFile(output);
}
}
該程序計算每個鍵的值的總和。 根據我的理解,本地組合器應該在每個節點上運行,並將相同鍵的值相加,然後在少量數據的情況下進行混洗。 但在SparkUI上,它顯示了大量的隨機讀取和隨機寫入(差不多58GB)。 我做錯了什麼? 如何知道本地組合器是否工作?
羣集細節: -
20個節點集羣
具有80GB的硬盤來,8GB RAM,4個核每個節點
Hadoop的2.7.2
火花2.0.2(預生成與 - Hadoop的2.7.x分佈)
輸入文件的詳細信息: -
輸入文件存儲在HDFS
輸入文件大小:400GB
多項紀錄:16129999990
戰績列:字符串(2 char),int,int,String(2 char),int,int,String(2 char),String(2 char),String(2 char) 在火花日誌中,我看到使用localitylevel NODE_LOCAL運行的任務。
你爲什麼要用combineByKey和reduceByKey? ReduceByKey將使用reducer作爲組合器,就像在你的例子中一樣 –