在Spark中使用Java進行編碼時,我一直面臨Spark中的reduceByKey中的參數問題。我不明白在reduceByKey函數中使用的參數。我知道reduceByKey的意思和它的工作方式。但是,下面的碼是從基本火花代碼例子有一點不同(例如:字計數例子)Spark中的ReduceByKey中的參數
正如你可以看到,有在reduceByKey兩個參數是new KrukalReducer(numPoints)
和numSubGraphs.
numSubGraphs
是整數值和KruskalReducer
是java類。
mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
new KruskalReducer(numPoints), numSubGraphs);
我不明白爲什麼這樣的整數變量用於reduceByKey。我試圖用ReduceByKey連接兩個參數到概念,但沒有得到它。
我附加了java類以供參考。
public static final class KruskalReducer implements Function2<Iterable<Edge>, Iterable<Edge>, Iterable<Edge>>{
private static final long serialVersionUID = 1L;
private transient UnionFind uf = null;
private final int numPoints;
public KruskalReducer(int numPoints) {
this.numPoints = numPoints;
}
// merge sort
@Override
public Iterable<Edge> call(Iterable<Edge> leftEdges, Iterable<Edge> rightEdges) throws Exception{
uf = new UnionFind(numPoints);
List<Edge> edges = Lists.newArrayList();
Iterator<Edge> leftEdgesIterator = leftEdges.iterator();
Iterator<Edge> rightEdgesIterator = rightEdges.iterator();
Edge leftEdge = leftEdgesIterator.next();
Edge rightEdge = rightEdgesIterator.next();
Edge minEdge;
boolean isLeft;
Iterator<Edge> minEdgeIterator;
final int numEdges = numPoints - 1;
do {
if (leftEdge.getWeight() < rightEdge.getWeight()) {
minEdgeIterator = leftEdgesIterator;
minEdge = leftEdge;
isLeft = true;
} else {
minEdgeIterator = rightEdgesIterator;
minEdge = rightEdge;
isLeft = false;
}
if (uf.unify(minEdge.getLeft(), minEdge.getRight())) {
edges.add(minEdge);
}
minEdge = minEdgeIterator.hasNext() ? minEdgeIterator.next() : null;
if (isLeft) {
leftEdge = minEdge;
} else {
rightEdge = minEdge;
}
}while (minEdge != null && edges.size() < numEdges);
minEdge = isLeft ? rightEdge : leftEdge;
minEdgeIterator = isLeft ? rightEdgesIterator : leftEdgesIterator;
while (edges.size() < numEdges && minEdgeIterator.hasNext()) {
if (uf.unify(minEdge.getLeft(), minEdge.getRight())) {
edges.add(minEdge);
}
minEdge = minEdgeIterator.next();
}
return edges;
}
}
另外,全相關的代碼被示爲如下。 (如果你感到困惑,你可以跳過此代碼)
JavaPairRDD<Integer, Iterable<Edge>> mstToBeMerged = partitions.combineByKey(new CreateCombiner(),
new Merger(), new KruskalReducer(numPoints));
JavaPairRDD<Integer, Iterable<Edge>> mstToBeMergedResult = null;
while (numSubGraphs > 1){
numSubGraphs = (numSubGraphs + (K - 1))/K;
mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
new KruskalReducer(numPoints), numSubGraphs);
mstToBeMerged = mstToBeMergedResult;
displayResults(mstToBeMerged);
}
private static class CreateCombiner implements Function<Edge, Iterable<Edge>>{
private static final long serialVersionUID = 1L;
@Override
public Iterable<Edge> call(Edge edge) throws Exception {
List<Edge> edgeList = Lists.newArrayListWithCapacity(1);
edgeList.add(edge);
return edgeList;
}
}
private static class Merger implements Function2<Iterable<Edge>, Edge, Iterable<Edge>>{
private static final long serialVersionUID = 1L;
@Override
public Iterable<Edge> call(Iterable<Edge> list, Edge edge) throws Exception {
List<Edge> mergeList = Lists.newArrayList(list);
mergeList.add(edge);
return mergeList;
}
}