2016-11-11 34 views
1
package com.mypackage 

import org.apache.spark.graphx._ 
import org.apache.spark.{SparkContext, SparkConf} 

/** 
    * Created by sidazhang on 11/8/16. 
    */ 
case class Person(age: Int) 

case class EdgeImpl() 

object GraphApp { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("SparkMain").setMaster("local[1]") 
    val sc = new SparkContext(conf) 

    val vertices = 
     sc.parallelize(Array((1L, Person(10)), (2L, Person(15)), 
     (3L, Person(20)), (4L, Person(30)))) 

    // Create an RDD for edges 
    val relationships = 
     sc.parallelize(Array(Edge(2L, 1L, EdgeImpl()), 
     Edge(3L, 1L, EdgeImpl()), Edge(4L, 1L, EdgeImpl()))) 
    val graph = Graph(vertices, relationships) 

    // Compute the number of older followers and their total age 
    val olderFollowers: VertexRDD[Array[Person]] = graph.aggregateMessages[Array[Person]](
     ctx => ctx.sendToDst(Array(ctx.srcAttr)), 
     // Merge the array of followers 
     (a, b) => a ++ b 
    ) 

    // Here I only have the id of the person and a list of his followers. 
    // How do I get the vertex of the person 
    olderFollowers.collect.foreach { case (id, followers) => followers.foreach(println(id, _)) } 
    } 
} 

問題是,通過aggregateMessage API,我最終得到了vertexId。我如何獲得實際的頂點。如何從aggregateMessages而不是隻是vertexId獲取實體

(現在的問題是內聯)

回答

0

你必須加入它的是原始數據:

graph.joinVertices(olderFollowers)(someMergingFunction).vertices 
+0

但是,這不是非常有用。 joinVertices要求您合併回原始頂點。所以在我的情況下,我實際上失去了我想要擁有的一羣追隨者 – samol

+0

(人,追隨者) – samol

相關問題