所以我想學習使用Python(Pyspark)的Spark。我想知道功能mapPartitions
是如何工作的。這就是它所需要的輸入和它給出的輸出。我從互聯網上找不到任何適當的例子。比方說,我有一個包含列表的RDD對象,如下所示。pyspark mapPartitions函數是如何工作的?
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
我想從所有列表中刪除元素2,我將如何實現這一目標使用mapPartitions
。
所以我想學習使用Python(Pyspark)的Spark。我想知道功能mapPartitions
是如何工作的。這就是它所需要的輸入和它給出的輸出。我從互聯網上找不到任何適當的例子。比方說,我有一個包含列表的RDD對象,如下所示。pyspark mapPartitions函數是如何工作的?
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
我想從所有列表中刪除元素2,我將如何實現這一目標使用mapPartitions
。
mapPartition應該被認爲是對分區的映射操作,而不是分區的元素。它的輸入是當前分區的集合,它的輸出將是另一組分區。
你通過地圖一定帶你去通過mapPartition必須將RDD類型的迭代您的RDD
功能的單個元素和其他一些或同一類型的返回和迭代函數。
在你的情況,你可能只想做這樣的事情
def filterOut2(line):
return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)
,如果你想使用mapPartition這將是
def filterOut2FromPartion(list_of_lists):
final_iterator = []
for sub_list in list_of_lists:
final_iterator.append([x for x in sub_list if x != 2])
return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)
它更容易使用yield
使用mapPartitions與發電機功能語法:
def filter_out_2(partition):
for element in partition:
if element != 2:
yield element
filtered_lists = data.mapPartition(filter_out_2)
這比僅僅返回一個列表更快嗎? – cgreen 2017-01-03 22:05:16
@cgreen該分區包含您的所有數據。我不確定你想要將所有數據加載到列表中。當您迭代數據時,生成器優先於列表。 – Narek 2017-01-03 22:40:28
@cgreen生成器使用較少的內存,因爲它們根據需要生成每個項目,而不是最初必須生成整個對象列表。所以它絕對使用更少的內存,因此速度可能更快。 [這是Python中生成器的一個很好的解釋](https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db)。 – 2017-11-26 22:52:28
爲什麼不在filterOut2FromPartition中返回任何內容f結。其次,在python中最後是一些關鍵字嗎?我想你的意思是說final.iterator = []而不是final_iterator。 – MetallicPriest 2014-11-04 21:39:39
解決了問題 – bearrito 2014-11-05 01:30:51
我試圖實現這個,但我得到錯誤「列表對象不是迭代器」。另外,我認爲當你寫[x for x in line if x!= 2]時,我認爲你的意思是[x for x in list if x!= 2]。我在那裏使用了列表。 – MetallicPriest 2014-11-05 10:27:55