讓我們開始與一些進口和變量,將需要的下游加工:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner
val nPartitions: Integer = ???
val n: Long = ???
val p: Double = ???
下一步我們需要可以用來生成邊緣種子ID的RDD。處理這種幼稚的方法是簡單地是這樣的:
sc.parallelize(0L to n)
由於產生的邊緣的數量取決於節點ID這種方法會給出一個極不平衡的負載。我們可以做的更好一點與重新分區:
sc.parallelize(0L to n)
.map((_, None))
.partitionBy(new HashPartitioner(nPartitions))
.keys
,但更好的方法是先從空RDD和生成到位的ID。我們需要一個小幫手:
def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}
可以使用如下:
val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))
只是快速完整性檢查(這是相當昂貴,因此在生產中不使用它):
require(ids.distinct.count == n)
,我們可以使用另一個幫手產生實際的邊緣:
def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j,()))
}
def genEdgesForPartition(iter: Iterator[Long]) = {
// It could be an overkill but better safe than sorry
// Depending on your requirement it could worth to
// consider using commons-math
// https://commons.apache.org/proper/commons-math/userguide/random.html
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}
val edges = ids.mapPartitions(genEdgesForPartition)
最後,我們可以創建一個圖表:
val graph = Graph.fromEdges(edges,())
我不知道MPI什麼,但是從你的描述,我可以告訴你想你的問題的辦法是太「低級別」。在Spark中,您不用擔心哪個執行程序正在存儲哪個數組。只需創建RDD,Spark將自動處理數據的分發和處理。我還建議你閱讀'GraphX'的文檔,因爲需要以某種方式定義Vertices和Edges以用於'GraphX'。 –
感謝您的建議。我試圖並行實現一個圖形發生器。發生器必須以這種方式創建邊緣,以使計算負載得到很好的平衡。 – max