2016-05-17 25 views
2

我想檢查一個新圖(稱爲A)是否是其他圖(稱爲B)的子圖。我寫了一個測試的小演示,但失敗了!我跑只是火花外殼演示,火花版本1.6.1:如何使用Spark圖形函數遮罩?

// Build the GraphB 
val usersB = sc.parallelize(Array(
    (3L, ("rxin", "student")), 
    (7L, ("jgonzal","postdoc")), 
    (5L, ("franklin", "prof")), 
    (2L, ("istoica", "prof")) 
)) 

val relationshipsB = sc.parallelize(Array(
    Edge(3L, 7L, "collab"), 
    Edge(5L, 3L, "advisor"), 
    Edge(2L, 5L, "colleague"), 
    Edge(5L, 7L, "pi") 
)) 

val defaultUser = ("John Doe", "Missing") 

val graphB = Graph(usersB, relationshipsB, defaultUser) 

// Build the initial Graph A 
val usersA = sc.parallelize(Array(
    (3L, ("rxin", "student")), 
    (7L, ("jgonzal", "postdoc")), 
    (5L, ("franklin", "prof")) 
)) 

val relationshipsA = sc.parallelize(Array(
    Edge(3L, 7L, "collab"), 
    Edge(5L, 3L, "advisor") 
)) 

val testGraphA = Graph(usersA, relationshipsA, defaultUser) 

//do the mask 
val maskResult = testGraphA.mask(graphB) 
maskResult.edges.count 
maskResult.vertices.count 

在我的理解API on spark website,屏蔽功能可按能得到所有相同的邊和頂點。然而,結果是頂點只是正確的(maskResult.vertices.count = 3),邊數應該是2但不是(maskResult.edges.count = 0)。

回答

2

如果你去看看the source,你會看到mask使用EdgeRDD.innerJoin。如果您在the documentation去找innerJoin,你會看到警告:

內加入本EdgeRDD與另一EdgeRDD,假設兩個使用相同PartitionStrategy分區

您將需要創建並使用PartitionStrategy。如果你做到以下幾點,它會得到你想要的結果(但可能不會很好地進行縮放):

object MyPartStrat extends PartitionStrategy { 
    override def getPartition(s: VertexId, d: VertexId, n: PartitionID) : PartitionID = { 
    1  // this is just to prove the point, you'll need a real partition strategy 
    } 
} 

然後,如果你這樣做:

val maskResult = testGraphA.partitionBy(MyPartStrat).mask(graphB.partitionBy(MyPartStrat)) 

你會得到你想要的結果。但就像我說的,你可能需要找出一個更好的分區策略,而不是把所有東西都塞進一個分區。

+0

不錯的答案。我只想補充一點,他可以選擇可以在[這裏]找到的預先打包的分區策略之一(http://spark.apache.org/docs/1.5.1/api/scala/index.html#org。 apache.spark.graphx.PartitionStrategy $)。所以,也許他不需要真正創建一個,他可以像'testGraphA.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)' –

+1

好,稍後會添加到我的答案 –