2017-06-13 34 views
0

在Spark中使用Java進行編碼時,我一直面臨Spark中的reduceByKey中的參數問題。我不明白在reduceByKey函數中使用的參數。我知道reduceByKey的意思和它的工作方式。但是,下面的碼是從基本火花代碼例子有一點不同(例如:字計數例子)Spark中的ReduceByKey中的參數

正如你可以看到,有在reduceByKey兩個參數是new KrukalReducer(numPoints)numSubGraphs.numSubGraphs是整數值和KruskalReducer是java類。

mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
        new KruskalReducer(numPoints), numSubGraphs); 

我不明白爲什麼這樣的整數變量用於reduceByKey。我試圖用ReduceByKey連接兩個參數到概念,但沒有得到它。

我附加了java類以供參考。

public static final class KruskalReducer implements Function2<Iterable<Edge>, Iterable<Edge>, Iterable<Edge>>{ 
     private static final long serialVersionUID = 1L; 
     private transient UnionFind uf = null; 
     private final int numPoints; 

     public KruskalReducer(int numPoints) { 
      this.numPoints = numPoints; 
     } 

     // merge sort 
     @Override 
     public Iterable<Edge> call(Iterable<Edge> leftEdges, Iterable<Edge> rightEdges) throws Exception{ 
      uf = new UnionFind(numPoints); 
      List<Edge> edges = Lists.newArrayList(); 
      Iterator<Edge> leftEdgesIterator = leftEdges.iterator(); 
      Iterator<Edge> rightEdgesIterator = rightEdges.iterator(); 
      Edge leftEdge = leftEdgesIterator.next(); 
      Edge rightEdge = rightEdgesIterator.next(); 
      Edge minEdge; 
      boolean isLeft; 
      Iterator<Edge> minEdgeIterator; 
      final int numEdges = numPoints - 1; 
      do { 
       if (leftEdge.getWeight() < rightEdge.getWeight()) { 
        minEdgeIterator = leftEdgesIterator; 
        minEdge = leftEdge; 
        isLeft = true; 
       } else { 
        minEdgeIterator = rightEdgesIterator; 
        minEdge = rightEdge; 
        isLeft = false; 
       } 
       if (uf.unify(minEdge.getLeft(), minEdge.getRight())) { 
        edges.add(minEdge); 
       } 
       minEdge = minEdgeIterator.hasNext() ? minEdgeIterator.next() : null; 
       if (isLeft) { 
        leftEdge = minEdge; 
       } else { 
        rightEdge = minEdge; 
       } 
      }while (minEdge != null && edges.size() < numEdges); 
      minEdge = isLeft ? rightEdge : leftEdge; 
      minEdgeIterator = isLeft ? rightEdgesIterator : leftEdgesIterator; 

      while (edges.size() < numEdges && minEdgeIterator.hasNext()) { 
       if (uf.unify(minEdge.getLeft(), minEdge.getRight())) { 
        edges.add(minEdge); 
       } 
       minEdge = minEdgeIterator.next(); 
      } 
      return edges; 
     } 
    } 

另外,全相關的代碼被示爲如下。 (如果你感到困惑,你可以跳過此代碼)

JavaPairRDD<Integer, Iterable<Edge>> mstToBeMerged = partitions.combineByKey(new CreateCombiner(), 
        new Merger(), new KruskalReducer(numPoints)); 


JavaPairRDD<Integer, Iterable<Edge>> mstToBeMergedResult = null; 
while (numSubGraphs > 1){ 
    numSubGraphs = (numSubGraphs + (K - 1))/K; 
    mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
       new KruskalReducer(numPoints), numSubGraphs); 
    mstToBeMerged = mstToBeMergedResult; 
    displayResults(mstToBeMerged); 
} 


private static class CreateCombiner implements Function<Edge, Iterable<Edge>>{ 

     private static final long serialVersionUID = 1L; 

     @Override 
     public Iterable<Edge> call(Edge edge) throws Exception { 
      List<Edge> edgeList = Lists.newArrayListWithCapacity(1); 
      edgeList.add(edge); 
      return edgeList; 
     } 
    } 

    private static class Merger implements Function2<Iterable<Edge>, Edge, Iterable<Edge>>{ 

     private static final long serialVersionUID = 1L; 

     @Override 
     public Iterable<Edge> call(Iterable<Edge> list, Edge edge) throws Exception { 
      List<Edge> mergeList = Lists.newArrayList(list); 
      mergeList.add(edge); 
      return mergeList; 
     } 
    } 

回答

1

餘did't明白爲什麼這樣的整數變量用於 reduceByKey。我試圖用 ReduceByKey連接兩個參數到概念,但未能得到它。

如果我讀正確的過載:

def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] = 
    fromRDD(rdd.reduceByKey(func, numPartitions)) 

然後你傳遞的數量是在基礎RDD分區的數量。因爲reduceByKey是洗牌邊界操作,所以數據將被重新分區,並通過該數字允許您控制將分配多少個分區。

相關問題