我希望能夠一次一次(或以小批量)從RDD中返回一行,以便我可以根據需要在本地收集行。我的RDD足夠大,無法放入名稱節點的內存中,因此運行collect()
會導致錯誤。在pyspark中收集帶有緩衝區的RDD
有沒有辦法重新創建collect()
操作,但有一個生成器,以便來自RDD的行傳遞到緩衝區?另一種選擇是從緩存的RDD一次向take()
100000行,但我不認爲take()
允許您指定一個開始位置?
我希望能夠一次一次(或以小批量)從RDD中返回一行,以便我可以根據需要在本地收集行。我的RDD足夠大,無法放入名稱節點的內存中,因此運行collect()
會導致錯誤。在pyspark中收集帶有緩衝區的RDD
有沒有辦法重新創建collect()
操作,但有一個生成器,以便來自RDD的行傳遞到緩衝區?另一種選擇是從緩存的RDD一次向take()
100000行,但我不認爲take()
允許您指定一個開始位置?
最佳可用選項是使用RDD.toLocalIterator
,該選項當時僅收集單個分區。它創建了一個標準的Python發生器:
rdd = sc.parallelize(range(100000))
iterator = rdd.toLocalIterator()
type(iterator)
## generator
even = (x for x in iterator if not x % 2)
可以調整使用特定的分割器和調整多個分區在單批收集的數據量。
不幸的是它帶有一個價格。要收集小批量,您必須啓動多個Spark作業,並且它非常昂貴。所以一般來說,當時收集元素不是一種選擇。
只是想添加一個小記事,如果你想要一個每個分區返回一個列表的迭代器,那麼'glom()'很適合。 – numeral
有什麼讓你想避免「saveAsTextFile」?因爲您可以將所有內容刷新到文件中,然後通過緩衝區讀取它。 –
@ paul-k我目前使用saveAsTextFile,但是這有幾個問題:1)讀取時間慢,因爲這些是文本文件,2)我失去了關於數據類型的信息,所以'1'與1相同 – mgoldwasser
這是真的2)仍然是一個問題,但你仍然可以編寫類型信息,即使這在文件大小方面不是很經濟。你也可以調用SaveAsPickleFile來序列化對象。 1)我不認爲這會比'collect'的實際實現慢,因爲它根據文檔從一個臨時文件讀取:ps://spark.apache.org/docs/0.7.2/api/pyspark /pyspark.rdd-pysrc.html#RDD.collect –