2017-10-12 94 views
0

我有具有以下結構的RDD:
((ByteArray, Idx), ((srcIdx,srcAdress), (destIdx,destAddress)))如何創建上的RDD(斯卡拉)嵌套for循環

此比特幣blockchain的邊緣(事務)的表示。 (ByteArray, Idx)可以看作是一個標識符,其餘的是一個邊緣。我的最終目標是聚合區塊鏈圖形表示中的節點。對此我需要做的第一次修改是將同一個比特幣交易中的資源放在一個邊緣(最終在一個節點中)。通過這種方式,我將「羣集」屬於同一用戶的公鑰。 此修改的結果將具有以下結構:
((ByteArray, Idx), (List((srcIdx, srcAddress)), (destIdx, destAddress)))
或者以任何其他形式具有相同的功能(例如,如果這在Scala中是不可能的或邏輯的)。

我目前的思維過程如下。在Java中,我會對RDD中的項目執行嵌套for循環,每個循環都爲具有相同密鑰的項目創建列表((ByteArray, Idx))。刪除任何重複項後。 但是,由於我正在處理RDD和Scala,所以這是不可能的。接下來,我嘗試在我的RDD上執行.collect(),然後單獨使用.map()函數,並使用集合在我的映射函數中循環。但是,Spark不喜歡這樣,因爲顯然集合不能被序列化。 接着我試圖創建一個「嵌套」地圖功能如下:

val aggregatedTransactions = joinedTransactions.map(f => { 
    var list = List[Any](f._2._1) 

    val filtered = joinedTransactions.filter(t => f._1 == t._1) 

    for(i <- filtered){ 
    list ::= i._2._1 
    } 

    (f._1, list, f._2._2) 
}) 

這是不允許的,因爲該過濾器(或映射)的功能是不可用的.MAP()。有什麼選擇?

我對Scala相當陌生,所以任何有用的背景信息都非常感謝。

+0

我認爲,鑑於你的問題的性質,這將是有益的,提供輸入+輸出的例子,以避免誤解 – dk14

回答

5

我的最終目標是在區塊鏈的圖形表示中聚合節點。對此我需要做的第一次修改是將同一個比特幣交易中的資源放在一個邊緣(最終在一個節點中)。

所以基本上你想groupByKey

joinedTransactions.groupByKey().map { 
    // process data to get desired shape 
} 
-1

嵌套RDDS是不可能的。然而RDD內藏品是 可能。

嵌套for循環可以使用cartesian

DEF笛卡爾[U](其他:RDD [U])(隱式爲arg0:ClassTag [U]):RDD [(T, U) ]永久鏈接返回此RDD的笛卡爾乘積和另一個 之一,即所有元素對(a,b)的RDD,其中a在 this中,而b在另一箇中。

val nestedForRDD = rdd1.cartesian(rdd2) 

nestedForRDD.map((rdd1TypeVal, rdd2TypeVal) => { 
    //Do your inner-nested evaluation code here 
}) 

使用星火SQL還可以實現它。

http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/