2015-11-19 23 views
5

我希望能夠一次一次(或以小批量)從RDD中返回一行,以便我可以根據需要在本地收集行。我的RDD足夠大,無法放入名稱節點的內存中,因此運行collect()會導致錯誤。在pyspark中收集帶有緩衝區的RDD

有沒有辦法重新創建collect()操作,但有一個生成器,以便來自RDD的行傳遞到緩衝區?另一種選擇是從緩存的RDD一次向take() 100000行,但我不認爲take()允許您指定一個開始位置?

+1

有什麼讓你想避免「saveAsTextFile」?因爲您可以將所有內容刷新到文件中,然後通過緩衝區讀取它。 –

+0

@ paul-k我目前使用saveAsTextFile,但是這有幾個問題:1)讀取時間慢,因爲這些是文本文件,2)我失去了關於數據類型的信息,所以'1'與1相同 – mgoldwasser

+0

這是真的2)仍然是一個問題,但你仍然可以編寫類型信息,即使這在文件大小方面不是很經濟。你也可以調用SaveAsPickleFile來序列化對象。 1)我不認爲這會比'collect'的實際實現慢,因爲它根據文檔從一個臨時文件讀取:ps://spark.apache.org/docs/0.7.2/api/pyspark /pyspark.rdd-pysrc.html#RDD.collect –

回答

5

最佳可用選項是使用RDD.toLocalIterator,該選項當時僅收集單個分區。它創建了一個標準的Python發生器:

rdd = sc.parallelize(range(100000)) 
iterator = rdd.toLocalIterator() 
type(iterator) 

## generator 

even = (x for x in iterator if not x % 2) 

可以調整使用特定的分割器和調整多個分區在單批收集的數據量。

不幸的是它帶有一個價格。要收集小批量,您必須啓動多個Spark作業,並且它非常昂貴。所以一般來說,當時收集元素不是一種選擇。

+0

只是想添加一個小記事,如果你想要一個每個分區返回一個列表的迭代器,那麼'glom()'很適合。 – numeral