2016-09-26 16 views
0

我正在應用map,然後使用pyspark在RDD上進行reduceByKey轉換。我嘗試以下兩個語法,兩者似乎工作:pyspark:使用(,)和[,]爲reducedByKey的pair表示之間的區別

的情況下1:

my_rdd_out = my_rdd.map(lambda r: [r['my_id'], [[r['my_value']]]])\ 
           .reduceByKey(lambda a, b: a+b)\ 
           .map(lambda r: r[1]) 

的情況下2:

my_rdd_out = my_rdd.map(lambda r: (r['my_id'], [[r['my_value']]]))\ 
           .reduceByKey(lambda a, b: a+b)\ 
           .map(lambda r: r[1]) 

這裏的r是類from pyspark.sql import Row的。 在情況1中,地圖輸出對位於括號內;在情況2中,地圖輸出對在括號中。雖然兩者都有效,但我想知道使用[]和()來表示一對,這將成爲reduceByKey的輸入嗎?謝謝!

回答

1

Python中的tuplelist之間的區別在於tuple對象是不可變的,因此它們是可哈希的。 list對象不可散列,因爲它們可以使用其引用進行修改。你可以使用它們中的任何一個(或者reduceByKey方法不適用於元組和列表),這只是一個方便的方法,當你從某個調用者那裏獲取對象時,只需要遍歷集合,不關心它是什麼樣的集合)。

這裏是here

def reduceByKey(func, iterable): 
    """Reduce by key. 
    Equivalent to the Spark counterpart 
    Inspired by http://stackoverflow.com/q/33648581/554319 
    1. Sort by key 
    2. Group by key yielding (key, grouper) 
    3. For each pair yield (key, reduce(func, last element of each grouper)) 
    """ 
    get_first = lambda p: p[0] 
    get_second = lambda p: p[1] 
    # iterable.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(func))) 
    return map(
     lambda l: (l[0], reduce(func, map(get_second, l[1]))), 
     groupby(sorted(iterable, key=get_first), get_first) 
    ) 

解除在你的榜樣,你有tuple(<something>).reduceByKey(lambda <something>)reduceByKey的實現。顯然,可迭代的是tuplefunc是lambda表達式。

正如你所看到的,輸入只需要是一個可迭代的。索引訪問甚至不是必需的。

你可能已經通過setdeque,一個生成器的理解,不管。它不包含任何轉換到列表或元組。

它甚至不需要同時獲取所有數據,每次只能獲取一個(生成器函數/解析也可以):避免無用臨時對象創建的優雅方法。

這要求iterable只在函數中迭代一次,這在sorted函數中產生list

+0

那麼在執行reduceByKey之前,reduceByKey實際上是否將列表轉換爲元組?哪種格式更正式/標準?謝謝! – Edamame

+1

看到我的編輯(需要編輯,謝謝你)。沒有演員。並根據方便選擇你的格式。對於僅爲調用函數而創建的元組/列表,很難選擇,我同意。 –

+0

嗨讓弗朗索瓦,謝謝你的詳細信息。我想了解def reduceByKey(func,iterable)的實現。我的例子中應該是func和iterable:reduceByKey(lambda a,b:a + b)?非常感謝! – Edamame