2014-11-04 280 views
11

所以我想學習使用Python(Pyspark)的Spark。我想知道功能mapPartitions是如何工作的。這就是它所需要的輸入和它給出的輸出。我從互聯網上找不到任何適當的例子。比方說,我有一個包含列表的RDD對象,如下所示。pyspark mapPartitions函數是如何工作的?

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

我想從所有列表中刪除元素2,我將如何實現這一目標使用mapPartitions

回答

17

mapPartition應該被認爲是對分區的映射操作,而不是分區的元素。它的輸入是當前分區的集合,它的輸出將是另一組分區。

你通過地圖一定帶你去通過mapPartition必須將RDD類型的迭代您的RDD

功能的單個元素和其他一些或同一類型的返回和迭代函數。

在你的情況,你可能只想做這樣的事情

def filterOut2(line): 
    return [x for x in line if x != 2] 

filtered_lists = data.map(filterOut2) 

,如果你想使用mapPartition這將是

def filterOut2FromPartion(list_of_lists): 
    final_iterator = [] 
    for sub_list in list_of_lists: 
    final_iterator.append([x for x in sub_list if x != 2]) 
    return iter(final_iterator) 

filtered_lists = data.mapPartition(filterOut2FromPartion) 
+0

爲什麼不在filterOut2FromPartition中返回任何內容f結。其次,在python中最後是一些關鍵字嗎?我想你的意思是說final.iterator = []而不是final_iterator。 – MetallicPriest 2014-11-04 21:39:39

+0

解決了問題 – bearrito 2014-11-05 01:30:51

+0

我試圖實現這個,但我得到錯誤「列表對象不是迭代器」。另外,我認爲當你寫[x for x in line if x!= 2]時,我認爲你的意思是[x for x in list if x!= 2]。我在那裏使用了列表。 – MetallicPriest 2014-11-05 10:27:55

18

它更容易使用yield使用mapPartitions與發電機功能語法:

def filter_out_2(partition): 
    for element in partition: 
     if element != 2: 
      yield element 

filtered_lists = data.mapPartition(filter_out_2) 
+0

這比僅僅返回一個列表更快嗎? – cgreen 2017-01-03 22:05:16

+1

@cgreen該分區包含您的所有數據。我不確定你想要將所有數據加載到列表中。當您迭代數據時,生成器優先於列表。 – Narek 2017-01-03 22:40:28

+0

@cgreen生成器使用較少的內存,因爲它們根據需要生成每個項目,而不是最初必須生成整個對象列表。所以它絕對使用更少的內存,因此速度可能更快。 [這是Python中生成器的一個很好的解釋](https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db)。 – 2017-11-26 22:52:28