1
我試圖在我的數據集上運行連接組件算法,但在方向圖上運行。我不希望連接的組件橫向在邊緣的兩個方向上。如何使用Spark Graphx或Graphframe創建方向圖
這是我的示例代碼
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.graphx.Edge
import org.apache.spark.sql._
import org.graphframes._
object CCTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName("cc_test")
implicit val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = sparkSession.sparkContext
val vertex = sparkSession.createDataFrame(sc.parallelize(Array(
(1L, "b4fcde907cbd290b7e16", 28),
(2L, "cda389612d6b37674cb1", 27),
(3L, "1a6a6e3fd2daaeeb2a05", 65),
(4L, "9a007eee210a47e58047", 42),
(5L, "e91898d39bf5f8501827", 55),
(6L, "ceab58c59d23549d3f4b", 50),
(12L, "ceab58c59asd3549d3f4b", 50),
(14L, "ceab508c59d23549d3f4b", 55),
(15L, "ceab508c59d23541d3f4b", 51)
))).toDF("id", "similar_hash", "size")
val edges = sparkSession.createDataFrame(sc.parallelize(Array(
Edge(2L, 1L, 0.7f),
Edge(2L, 4L, 0.2f),
Edge(3L, 2L, 0.4f),
Edge(3L, 6L, 0.3f),
Edge(4L, 1L, 0.1f),
Edge(5L, 2L, 0.2f),
Edge(5L, 3L, 0.8f),
Edge(5L, 6L, 0.3f),
Edge(12L, 14L, 1.3f),
Edge(15L, 14L, 1.3f) //< - should not be connected except (14L, 15L)
))).toDF("src", "dst", "attr")
val graph = GraphFrame(vertex, edges)
val cc = graph.connectedComponents.run()
cc.show()
sparkSession.stop()
}
}
結果:
+---+--------------------+----+---------+
| id| similar_hash|size|component|
+---+--------------------+----+---------+
| 6|ceab58c59d23549d3f4b| 50| 1|
| 5|e91898d39bf5f8501827| 55| 1|
| 1|b4fcde907cbd290b7e16| 28| 1|
| 3|1a6a6e3fd2daaeeb2a05| 65| 1|
| 12|ceab58c59asd3549d...| 50| 12|
| 2|cda389612d6b37674cb1| 27| 1|
| 4|9a007eee210a47e58047| 42| 1|
| 14|ceab508c59d23549d...| 55| 12|
| 15|ceab508c59d23541d...| 51| 12| <- should be in separate cluster
+---+--------------------+----+---------+
我怎麼能這樣實現嗎?
['stronglyConnectedComponents'](https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](numIter:Int):org。 apache.spark.graphx.Graph [org.apache.spark.graphx.VertexId,ED])。 – zero323
@ zero323 我不認爲SCC可以在這裏使用(糾正我,如果我錯了),因爲它需要每個節點都可以從其他節點到達。但基本上,我需要做一個遞歸如SQL遞歸查詢。 例如: 一個 - > B, 乙 - > C, 米 - >乙 強CC: 集= {A,B}, 集= {B,C}, 集= {m,b} 連接的組件: 這將是set ='{a,b,d,m}' - notice和'm'連接到節點'a - > b - > m'太 但在遞歸 a→b→c = {a,b,c} 和 m→b = {m,b} 這給出了兩組{a,b,c}和{m,b} –
OK。那麼爲什麼'{a,b,c}','{m,b}'而不是'{a,b,c}','{m,b,c}'?無論如何,它看起來像你需要Pregel API。 – zero323