2015-05-19 83 views
2

這個問題是關於Spark GraphX的。我想通過刪除某些其他節點的鄰居節點來計算子圖。如何過濾鄰居頂點類型的混合節點圖

[任務]保留不屬於C2的節點的鄰居節點甲和B節點。

輸入圖:

    ┌────┐ 
       ┌─────│ A │──────┐ 
       │  └────┘  │ 
       v     v 
┌────┐  ┌────┐   ┌────┐  ┌────┐ 
│ C1 │────>│ B │   │ B │<────│ C2 │ 
└────┘  └────┘   └────┘  └────┘ 
      ^    ^
       │  ┌────┐  │ 
       └─────│ A │──────┘ 
        └────┘ 

輸出圖:

  ┌────┐ 
    ┌─────│ A │ 
    │  └────┘ 
    v   
┌────┐   
│ B │   
└────┘   
^   
    │  ┌────┐ 
    └─────│ A │ 
     └────┘ 

如何優雅地寫GraphX查詢,返回輸出圖形?

+0

'Edge.attr'是否有用? –

回答

2

一種不同的方式來找到val nodesAB使用GraphOps.collectNeighbors

val nodesAB = graph.collectNeighbors(EdgeDirection.Either) 
    .filter{case (vid,ns) => ! ns.map(_._2).contains("C2")}.map(_._1) 
    .intersection(
    graph.vertices 
     .filter{case (vid,attr) => ! attr.toString.startsWith("C") }.map(_._1) 
) 

其餘的工作你有同樣的方式:

val solution1 = Graph(nodesAB, graph.edges) . 
subgraph(vpred = {case(id, label) => label != null}) 

如果你想使用DataFrames,這可能是更具可擴展性(?) ,那麼首先我們需要將nodesAB變成一個DataFrame:

val newNodes = sqlContext.createDataFrame(
    nodesAB, 
    StructType(Array(StructField("newNode", LongType, false))) 
) 

而你創造ED和數據框邊緣與此:

val edgeDf = sqlContext.createDataFrame(
    graph.edges.map{edge => Row(edge.srcId, edge.dstId, edge.attr)}, 
    StructType(Array(
    StructField("srcId", LongType, false), 
    StructField("dstId", LongType, false), 
    StructField("attr", LongType, false) 
)) 
) 

然後,您可以做到這一點沒有子圖以創建圖表:

val solution1 = Graph(
    nodesAB, 
    edgeDf 
    .join(newNodes, $"srcId" === $"newNode").select($"srcId", $"dstId", $"attr") 
    .join(newNodes, $"dstId" === $"newNode") 
    .rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getLong(2))) 
) 
+0

我喜歡您的解決方案,因爲它使用了collectNeighbors。 –

+0

謝謝。這令人驚訝地很難做到。我可能做的一件事情不同,我最近了解到'RDD.cogroup'。'collectNeighbors'只返回主節點的VertexId,而不是'attr'。如果我使用'cogroup'添加頂點屬性,我可能可以避免我的代碼中的'intersection'。然後我可以在第一個過濾器中過濾'startsWith(「C」)''。 –

+0

我結束了第三種方式。使用aggregateMessages,我發送一條「刪除我」消息給所有應該刪除的dst頂點。然後我通過1)outerJoinVertices和2)子圖步驟將這些頂點從圖中濾除。 –

0

一種解決方案是使用三重視圖來識別C1節點的鄰居的B節點的子集。接下來,將這些與A節點結合。接下來,創建一個新的圖表:

// Step 1 
// Compute the subset of B's that are neighbors with C1 
val nodesBC1 = graph.triplets . 
    filter {trip => trip.srcAttr == "C1"} . 
    map {trip => (trip.dstId, trip.dstAttr)} 

// Step 2  
// Union the subset B's with all the A's 
val nodesAB = nodesBC1 . 
    union(graph.vertices filter {case (id, label) => label == "A"}) 

// Step 3 
// Create a graph using the subset nodes and all the original edges 
// Remove nodes that have null values 
val solution1 = Graph(nodesAB, graph.edges) . 
    subgraph(vpred = {case(id, label) => label != null}) 

在步驟1 I重新創建的節點RDD(含B節點)通過映射一起三重視圖的dstID和dstAttr。不確定這對於大圖有多高效?

+0

在玩了幾個小時之後,我不確定是否有更好的方法來識別要移除的邊緣,而不是通過邊緣創建'attr'值爲'null'的頂點,然後使用'subgraph'做最終的修剪,至少不用像'DataFrames'或一個大的'RDD.cartesian'那樣做。 –

+0

很酷。感謝您試用@DavidGriffin :-) –

3

這裏是另一種解決方案。此解決方案使用aggregateMessages將整數(1)發送給應該從圖中刪除的那些B。生成的頂點集與圖形連接,隨後的子圖調用從輸出圖中刪除不需要的B。

// Step 1: send the message (1) to vertices that should be removed 
val deleteMe = graph.aggregateMessages[Int](
    ctx => { 
     if (ctx.dstAttr.equals("B") && ctx.srcAttr.equals("C")) { 
     ctx.sendToDst(1) // 1 means delete, but number is not actually used 
     } 
    }, 
    (a,b) => a // choose either message, they are all (1) 
) 

    // Step 2: join vertex sets, original and deleteMe 
    val joined = graph.outerJoinVertices(deleteMe) { 
    (id, origValue, msgValue) => msgValue match { 
     case Some(number) => "deleteme" // vertex received msg 
     case None => origValue 
    } 
    } 

    // Step 3: Remove nodes with domain = deleteme 
    joined.subgraph(vpred = (id, data) => data.equals("deleteme")) 

我正在考慮只使用一箇中間刪除標記的方法,例如, 「deleteme」,而不是1和「deleteme」。但這是一件好事,因爲我可以做到這一點。