1
package com.mypackage
import org.apache.spark.graphx._
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by sidazhang on 11/8/16.
*/
case class Person(age: Int)
case class EdgeImpl()
object GraphApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkMain").setMaster("local[1]")
val sc = new SparkContext(conf)
val vertices =
sc.parallelize(Array((1L, Person(10)), (2L, Person(15)),
(3L, Person(20)), (4L, Person(30))))
// Create an RDD for edges
val relationships =
sc.parallelize(Array(Edge(2L, 1L, EdgeImpl()),
Edge(3L, 1L, EdgeImpl()), Edge(4L, 1L, EdgeImpl())))
val graph = Graph(vertices, relationships)
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[Array[Person]] = graph.aggregateMessages[Array[Person]](
ctx => ctx.sendToDst(Array(ctx.srcAttr)),
// Merge the array of followers
(a, b) => a ++ b
)
// Here I only have the id of the person and a list of his followers.
// How do I get the vertex of the person
olderFollowers.collect.foreach { case (id, followers) => followers.foreach(println(id, _)) }
}
}
問題是,通過aggregateMessage API,我最終得到了vertexId。我如何獲得實際的頂點。如何從aggregateMessages而不是隻是vertexId獲取實體
(現在的問題是內聯)
但是,這不是非常有用。 joinVertices要求您合併回原始頂點。所以在我的情況下,我實際上失去了我想要擁有的一羣追隨者 – samol
(人,追隨者) – samol