我發現火花分區是由這個問題的原因:
Spark partitionBy much slower than without it
所以我實現它,如下所示,它解決了這一問題,而且它提高了性能自動:
withPartition = withPartition.persist(StorageLevel.MEMORY_AND_DISK());
Dataset<DayAndHour> daysAndHours = withPartition.map(mapToDayHour(), Encoders.bean(DayAndHour.class)).distinct();
DayAndHour[] collect = (DayAndHour[])daysAndHours.collect();
Arrays.sort(collect);
logger.info("found " + collect.length +" different days and hours: "
+ Arrays.stream(collect).map(DayAndHour::toString).collect(Collectors.joining(",")) );
long time = System.currentTimeMillis();
for(DayAndHour dayAndHour : collect){
int day = dayAndHour.getDay();
int hour = dayAndHour.getHour();
logger.info("Start filter on " + dayAndHour);
Dataset<Row> filtered = withPartition.filter(filterDayAndHour(day, hour))
.drop("day", hour");
String newPath = path + "/"
+ "day" +"=" +day +"/"
+ "hour" +"=" + hour;
long specificPathCount = filtered.count();
long timeStart = System.currentTimeMillis();
logger.info("writing " + specificPathCount+ " event to " + newPath );
filtered.write()
.format(format)
.mode(SaveMode.Append)
.save(newPath);
logger.info("Finish writing partition of " + dayAndHour+ " to "+ newPath+ ". Wrote [" + specificPathCount +"] events in " + TimeUtils.tookMinuteSecondsAndMillis(timeStart, System.currentTimeMillis()));
}
logger.info("Finish writing " + path+ ". Wrote [" + cnt +"] events in " + MinuteTimeUtils.tookMinuteSecondsAndMillis(time, System.currentTimeMillis()));
withPartition.unpersist();
private static MapFunction<Row, DayAndHour> mapToDayHour() {
return new MapFunction<Row, DayAndHour>() {
@Override
public DayAndHour call(Row value) throws Exception {
int day = value.getAs("day");
int hour = value.getAs(hour");
DayAndHour dayAndHour = new DayAndHour();
dayAndHour.setDay(day);
dayAndHour.setHour(hour);
return dayAndHour;
}
};
}
private static FilterFunction<Row> filterDayAndHour(int day, int hour) {
return new FilterFunction<Row>() {
@Override
public boolean call(Row value) throws Exception {
int cDay = value.getAs("day");
int cHour = value.getAs(hour");
return day == cDay && hour == cHour;
}
};
}
//而另一POJO
public class DayAndHour implements Serializable , Comparable<DayAndHour>{
private int day;
private int hour;
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getHour() {
return hour;
}
public void setHour(int hour) {
this.hour = hour;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DayAndHour that = (DayAndHour) o;
if (day != that.day) return false;
return hour == that.hour;
}
@Override
public int hashCode() {
int result = day;
result = 31 * result + hour;
return result;
}
@Override
public String toString() {
return "(" +
"day=" + day +
", hour=" + hour +
')';
}
@Override
public int compareTo(DayAndHour dayAndHour) {
return Integer.compare((day * 100) + hour, (dayAndHour.day * 100) + dayAndHour.hour);
}
}