2015-12-15 78 views
2

我正在從基於MPI的系統轉向Apache Spark。我需要在Spark中執行以下操作。在Spark中爲每個Executor創建數組並將其合併爲RDD

假設,我有n頂點。我想從這些n頂點創建一個邊緣列表。邊只是兩個整數(u,v)的元組,不需要屬性。

但是,我想在每個執行器中獨立地並行創建它們。因此,我想要爲P Spark Executors獨立創建P邊陣列。每個數組可能具有不同的大小,並且取決於頂點,因此,我還需要執行者ID從0n-1。接下來,我想要一個全局RDD陣列的邊緣。

在MPI中,我會在每個使用處理器級別的處理器中創建一個數組。我如何在Spark中做到這一點,特別是使用GraphX庫?

因此,我的主要目標是在每個執行器中創建一個邊數組,並將它們合併爲一個RDD。

我首先嚐試了鄂爾多斯 - 仁義模型的一個修改版本。作爲參數,我只有節點數n和概率p。

假設,執行者i必須處理從101200的節點。對於任何節點而言,節點101,它將以概率p創建從101102 -- n的邊。在每個執行器創建分配的邊後,我將實例化GraphX EdgeRDDVertexRDD。因此,我的計劃是在每個執行器中獨立創建邊界列表,並將它們合併到RDD中。

+1

我不知道MPI什麼,但是從你的描述,我可以告訴你想你的問題的辦法是太「低級別」。在Spark中,您不用擔心哪個執行程序正在存儲哪個數組。只需創建RDD,Spark將自動處理數據的分發和處理。我還建議你閱讀'GraphX'的文檔,因爲需要以某種方式定義Vertices和Edges以用於'GraphX'。 –

+0

感謝您的建議。我試圖並行實現一個圖形發生器。發生器必須以這種方式創建邊緣,以使計算負載得到很好的平衡。 – max

回答

4

讓我們開始與一些進口和變量,將需要的下游加工:

import org.apache.spark._ 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD 
import scala.util.Random 
import org.apache.spark.HashPartitioner 

val nPartitions: Integer = ??? 
val n: Long = ??? 
val p: Double = ??? 

下一步我們需要可以用來生成邊緣種子ID的RDD。處理這種幼稚的方法是簡單地是這樣的:

sc.parallelize(0L to n) 

由於產生的邊緣的數量取決於節點ID這種方法會給出一個極不平衡的負載。我們可以做的更好一點與重新分區:

sc.parallelize(0L to n) 
    .map((_, None)) 
    .partitionBy(new HashPartitioner(nPartitions)) 
    .keys 

,但更好的方法是先從空RDD和生成到位的ID。我們需要一個小幫手:

def genNodeIds(nPartitions: Int, n: Long)(i: Int) = { 
    (0L until n).filter(_ % nPartitions == i).toIterator 
} 

可以使用如下:

val empty = sc.parallelize(Seq.empty[Int], nPartitions) 
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i)) 

只是快速完整性檢查(這是相當昂貴,因此在生產中不使用它):

require(ids.distinct.count == n) 

,我們可以使用另一個幫手產生實際的邊緣:

def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = { 
    (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j,())) 
} 

def genEdgesForPartition(iter: Iterator[Long]) = { 
    // It could be an overkill but better safe than sorry 
    // Depending on your requirement it could worth to 
    // consider using commons-math 
    // https://commons.apache.org/proper/commons-math/userguide/random.html 
    val random = new Random(new java.security.SecureRandom()) 
    iter.flatMap(genEdgesForId(p, n, random)) 
} 

val edges = ids.mapPartitions(genEdgesForPartition) 

最後,我們可以創建一個圖表:

val graph = Graph.fromEdges(edges,()) 
+0

非常感謝,這是非常徹底的指導。這對我對Spark的理解也很有幫助。 – max

+1

我簡化了這一點。它還應該解決您在其他問題中提到的問題。 – zero323

相關問題