RDD.saveAsTextFile最後一個階段非常緩慢。我懷疑記錄不是均勻分佈在分區和任務上的問題。有什麼辦法可以強制這個嗎?Spark沒有將負載均勻分配到任務
public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating,
JavaPairRDD<Integer, Integer> userIdClusterId,
int numPartitions, String outDir){
/*
convert the JavaRDD<Rating> to JavaPairRDD<Integer,DmRating>
*/
JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() {
public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception {
return new Tuple2<>(dmRating.user(), (DmRating)dmRating);
}
});
/*
join this RDD with userIdClusterID RDD by key
*/
JavaPairRDD<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating = userIdClusterId.join(userIdDmRating, numPartitions);
// extract the clusterId to videoId map
JavaPairRDD<Integer, Integer> clusterIdVideoId = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer,DmRating>>, Integer, Integer>() {
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userIdDmRatingClusterId) throws Exception {
Integer userId = userIdDmRatingClusterId._1();
Tuple2<Integer, DmRating> dmRatingClusterId = userIdDmRatingClusterId._2();
return new Tuple2<Integer, Integer>(dmRatingClusterId._1(), dmRatingClusterId._2().product());
}
});
//////
/// Count the popularity of a video in a cluster
JavaPairRDD<String, Integer> clusterIdVideoIdStrInt = clusterIdVideoId.mapToPair(new PairFunction<Tuple2<Integer, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, Integer> videoIdClusterId) throws Exception {
return new Tuple2<>(String.format("%d:%d", videoIdClusterId._1(), videoIdClusterId._2()), 1);
}
});
JavaPairRDD<String, Integer> clusterIdVideoIdStrCount = clusterIdVideoIdStrInt.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
///
JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterId_T_videoIdCount = clusterIdVideoIdStrCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> clusterIdVideoIdStrCount) throws Exception {
String[] splits = clusterIdVideoIdStrCount._1().split(":");
try{
if(splits.length==2){
int clusterId = Integer.parseInt(splits[0]);
int videoId = Integer.parseInt(splits[1]);
return new Tuple2<>(clusterId, new Tuple2<>(videoId, clusterIdVideoIdStrCount._2()));
}else{
//Should never occur
LOGGER.error("Could not split {} into two with : as the separator!", clusterIdVideoIdStrCount._1());
}
}catch (NumberFormatException ex){
LOGGER.error(ex.getMessage());
}
return new Tuple2<>(-1, new Tuple2<>(-1,-1));
}
});
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> clusterIdVideoIdGrouped = clusterId_T_videoIdCount.groupByKey();
JavaPairRDD<Integer, DmRating> clusterIdDmRating = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer, DmRating>>, Integer, DmRating>() {
@Override
public Tuple2<Integer, DmRating> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating) throws Exception {
return userId_T_clusterIdDmRating._2();
}
});
JavaPairRDD<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> clusterId_T_DmRatingVideoIds = clusterIdDmRating.join(clusterIdVideoIdGrouped, numPartitions);
JavaPairRDD<Integer, String> userIdStringRDD = clusterId_T_DmRatingVideoIds.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> v1) throws Exception {
int clusterId = v1._1();
Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>> tuple = v1._2();
DmRating rating = tuple._1();
Iterable<Tuple2<Integer, Integer>> videosCounts= tuple._2();
StringBuilder recosStr = new StringBuilder();
boolean appendComa = false;
for(Tuple2<Integer, Integer> videoCount : videosCounts){
if(appendComa) recosStr.append(",");
recosStr.append("{");
recosStr.append("\"video_id\":");
recosStr.append(videoCount._1());
recosStr.append(",");
recosStr.append("\"count\":");
recosStr.append(videoCount._2());
recosStr.append("}");
appendComa = true;
}
String val = String.format("{\"user_id\":\"%s\",\"v1st\":\"%s\",\"redis_uid\":%s,\"cluster_id\": %d,\"recommendations\":[ %s ]}", rating.dmUserId, rating.dmV1stStr, rating.user(), clusterId, recosStr);
return new Tuple2<Integer, String>(rating.user(), val);
}
});
JavaPairRDD<Integer, Iterable<String>> groupedRdd = userIdStringRDD.groupByKey(numPartitions);
JavaRDD<String> jsonStringRdd = groupedRdd.map(new Function<Tuple2<Integer, Iterable<String>>, String>() {
@Override
public String call(Tuple2<Integer, Iterable<String>> v1) throws Exception {
for(String str : v1._2()){
return str;
}
LOGGER.error("Could not fetch a string from iterable so returning empty");
return "";
}
});
//LOGGER.info("Number of items in RDD: {}", jsonStringRDD.count());
//return jsonStringRDD.persist(StorageLevel.MEMORY_ONLY_SER_2());
LOGGER.info("Repartitioning the data into {}", numPartitions);
jsonStringRdd.cache().saveAsTextFile(outDir);
return jsonStringRdd;
}
羣集大小: 1.主:16 CPU,32GB 2.工人4:32CPU,102GB,4X375GB SSD驅動器
我改變了代碼中使用DataFrames代替。還是同樣的問題
public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD,
int numPartitions,
JavaSparkContext javaSparkContext,
String outdir){
JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() {
public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception {
//Integer userId = v1._1();
Tuple2<DmRating, Integer> values = v1._2();
DmRating rating = values._1();
Integer clusterId = values._2();
rating.setClusterId(clusterId);
rating.setVideoId(rating.product());
rating.setV1stOrUserId((rating.userId== null || rating.userId.isEmpty())? rating.v1stId : rating.userId);
rating.setRedisId(rating.user());
return rating;
//return String.format("{\"clusterId\": %s,\"userId\": %s, \"userId\":\"%s\", \"videoId\": %s}", clusterId, userId, rating.userId, rating.product());
}
});
SQLContext sqlContext = new SQLContext(javaSparkContext);
DataFrame dmRatingDF = sqlContext.createDataFrame(dmRatingJavaRDD, DmRating.class);
dmRatingDF.registerTempTable("dmrating");
DataFrame clusterIdVideoIdDF = sqlContext.sql("SELECT clusterId, videoId FROM dmrating").cache();
DataFrame rolledupClusterIdVideoIdDF = clusterIdVideoIdDF.rollup("clusterId","videoId").count().cache();
DataFrame clusterIdUserIdDF = sqlContext.sql("SELECT clusterId, userId, redisId, v1stId FROM dmrating").distinct().cache();
JavaRDD<Row> rolledUpRDD = rolledupClusterIdVideoIdDF.toJavaRDD();
JavaRDD<Row> filteredRolledUpRDD = rolledUpRDD.filter(new Function<Row, Boolean>() {
@Override
public Boolean call(Row v1) throws Exception {
//make sure the size and values of the properties are correct
return !(v1.size()!=3 || v1.isNullAt(0) || v1.isNullAt(1) || v1.isNullAt(2));
}
});
JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterIdVideoIdCount = filteredRolledUpRDD.mapToPair(new PairFunction<Row, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Integer>> call(Row row) throws Exception {
Tuple2<Integer, Integer> videoIdCount = new Tuple2<Integer, Integer>(row.getInt(1), Long.valueOf(row.getLong(2)).intValue());
return new Tuple2<Integer, Tuple2<Integer, Integer>>(row.getInt(0),videoIdCount);
}
}).cache();
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> groupedPair = clusterIdVideoIdCount.groupByKey(numPartitions).cache();
JavaRDD<ClusterIdVideos> groupedFlat = groupedPair.map(new Function<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, ClusterIdVideos>() {
@Override
public ClusterIdVideos call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> v1) throws Exception {
ClusterIdVideos row = new ClusterIdVideos();
Iterable<Tuple2<Integer, Integer>> videosCounts= v1._2();
StringBuilder recosStr = new StringBuilder();
recosStr.append("[");
boolean appendComa = false;
for(Tuple2<Integer, Integer> videoCount : videosCounts){
if(appendComa) recosStr.append(",");
recosStr.append("{");
recosStr.append("\"video_id\":");
recosStr.append(videoCount._1());
recosStr.append(",");
recosStr.append("\"count\":");
recosStr.append(videoCount._2());
recosStr.append("}");
appendComa = true;
}
recosStr.append("]");
row.setClusterId(v1._1());
row.setVideos(recosStr.toString());
return row;
}
}).cache();
DataFrame groupedClusterId = sqlContext.createDataFrame(groupedFlat, ClusterIdVideos.class);
DataFrame recosDf = clusterIdUserIdDF.join(groupedClusterId, "clusterId");
recosDf.write().parquet(outdir);
}
使用「.repartition(numPartitions:Int)」。它會增加並行性,但這可能無法解決你的問題...你可以發佈代碼。 –
Sumit
試圖..不影響時間或分區 – Ram
我們可能能夠幫助但需要查看代碼。所以請張貼代碼。 – Sumit