2017-02-27 90 views
1

我試圖在我的數據集上運行連接組件算法,但在方向圖上運行。我不希望連接的組件橫向在邊緣的兩個方向上。如何使用Spark Graphx或Graphframe創建方向圖

這是我的示例代碼

import org.apache.log4j.{Level, LogManager} 
import org.apache.spark.SparkConf 
import org.apache.spark.graphx.Edge 
import org.apache.spark.sql._ 
import org.graphframes._ 

object CCTest { 

def main(args: Array[String]) { 

    val sparkConf = new SparkConf() 
     .setMaster("local[2]") 
    .setAppName("cc_test") 


    implicit val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 

    val sc = sparkSession.sparkContext 

    val vertex = sparkSession.createDataFrame(sc.parallelize(Array(
    (1L, "b4fcde907cbd290b7e16", 28), 
    (2L, "cda389612d6b37674cb1", 27), 
    (3L, "1a6a6e3fd2daaeeb2a05", 65), 
    (4L, "9a007eee210a47e58047", 42), 
    (5L, "e91898d39bf5f8501827", 55), 
    (6L, "ceab58c59d23549d3f4b", 50), 
    (12L, "ceab58c59asd3549d3f4b", 50), 
    (14L, "ceab508c59d23549d3f4b", 55), 
    (15L, "ceab508c59d23541d3f4b", 51) 
))).toDF("id", "similar_hash", "size") 

    val edges = sparkSession.createDataFrame(sc.parallelize(Array(
      Edge(2L, 1L, 0.7f), 
      Edge(2L, 4L, 0.2f), 
      Edge(3L, 2L, 0.4f), 
      Edge(3L, 6L, 0.3f), 
      Edge(4L, 1L, 0.1f), 
      Edge(5L, 2L, 0.2f), 
      Edge(5L, 3L, 0.8f), 
      Edge(5L, 6L, 0.3f), 
      Edge(12L, 14L, 1.3f), 
      Edge(15L, 14L, 1.3f) //< - should not be connected except (14L, 15L) 
     ))).toDF("src", "dst", "attr") 


    val graph = GraphFrame(vertex, edges) 

    val cc = graph.connectedComponents.run() 

    cc.show() 



    sparkSession.stop() 

} 

} 

結果:

+---+--------------------+----+---------+ 
| id|  similar_hash|size|component| 
+---+--------------------+----+---------+ 
| 6|ceab58c59d23549d3f4b| 50|  1| 
| 5|e91898d39bf5f8501827| 55|  1| 
| 1|b4fcde907cbd290b7e16| 28|  1| 
| 3|1a6a6e3fd2daaeeb2a05| 65|  1| 
| 12|ceab58c59asd3549d...| 50|  12| 
| 2|cda389612d6b37674cb1| 27|  1| 
| 4|9a007eee210a47e58047| 42|  1| 
| 14|ceab508c59d23549d...| 55|  12| 
| 15|ceab508c59d23541d...| 51|  12| <- should be in separate cluster 
+---+--------------------+----+---------+ 

我怎麼能這樣實現嗎?

+1

['stronglyConnectedComponents'](https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](numIter:Int):org。 apache.spark.graphx.Graph [org.apache.spark.graphx.VertexId,ED])。 – zero323

+0

@ zero323 我不認爲SCC可以在這裏使用(糾正我,如果我錯了),因爲它需要每個節點都可以從其他節點到達。但基本上,我需要做一個遞歸如SQL遞歸查詢。 例如: 一個 - > B, 乙 - > C, 米 - >乙 強CC: 集= {A,B}, 集= {B,C}, 集= {m,b} 連接的組件: 這將是set ='{a,b,d,m}' - notice和'm'連接到節點'a - > b - > m'太 但在遞歸 a→b→c = {a,b,c} 和 m→b = {m,b} 這給出了兩組{a,b,c}和{m,b} –

+0

OK。那麼爲什麼'{a,b,c}','{m,b}'而不是'{a,b,c}','{m,b,c}'?無論如何,它看起來像你需要Pregel API。 – zero323

回答

0

我可能會誤解,但我的解決方案直接來自於connectedComponentssource code。在第54行,系統本身只要求預凝膠迭代器,與重點線是

val pregelGraph = Pregel(ccGraph, initialMessage, maxIterations,EdgeDirection.Either)

通過簡單地改變EdgeDirection到更合適的參數(可以發現here),這可能會爲你工作。

相關問題