2014-12-30 54 views
0

我被一個教程 http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html星火 - GraphX:mapReduceTriplets VS aggregateMessages

而在一些點上運行,我們使用mapReduceTriplets操作。這將返回預期的結果

// Find the oldest follower for each user 
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
    // For each edge send a message to the destination vertex with the attribute of the source vertex 
    edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))), 
    // To combine messages take the message for the older follower 
    (a, b) => if (a._2 > b._2) a else b 
) 

不過的IntelliJ點我mapReduceTriplets被棄用(如1.2.0),並應通過aggregateMessages更換

// Find the oldest follower for each user 
val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages()[(String, Int)](
    // For each edge send a message to the destination vertex with the attribute of the source vertex 
    edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))), 
    // To combine messages take the message for the older follower 
    (a, b) => if (a._2 > b._2) a else b 
) 

所以我跑完全相同的代碼,但隨後我沒有任何輸出。這是預期的結果還是我應該改變一些由於aggregateMessages的cahnge?

回答

2

也許你需要的是這樣的:

val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)] 
(
    // For each edge send a message to the destination vertex with the attribute of the source vertex 
    sendMsg = { triplet => triplet.sendToDst(triplet.srcAttr.name, triplet.srcAttr.age) }, 
    // To combine messages take the message for the older follower 
    mergeMsg = {(a, b) => if (a._2 > b._2) a else b} 
) 

您可以在Grapx proggraming guide頁面發現aggregateMessages函數簽名和有用的例子。希望這可以幫助。