2017-05-27 127 views
1

是否可以在Python中廣播RDD?如何在PySpark中廣播RDD?

我正在關注「高級分析與Spark:規模化數據學習模式」一書,第3章需要廣播RDD。我試圖用Python代替Scala來追蹤這些例子。

無論如何,就算在這個簡單的例子我有一個錯誤:

my_list = ["a", "d", "c", "b"] 
my_list_rdd = sc.parallelize(my_list) 
sc.broadcast(my_list_rdd) 

錯誤時正在:

"It appears that you are attempting to broadcast an RDD or reference an RDD from an " 
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an 
action or transformation. RDD transformations and actions can only be invoked by the driver, n 
ot inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) i 
s invalid because the values transformation and count action cannot be performed inside of the 
rdd1.map transformation. For more information, see SPARK-5063. 

我真的不明白什麼是「行動或改造」錯誤是指至。我正在使用spark-2.1.1-hadoop2.7

重要編輯:這本書是正確的。我只是沒有看到它不是正在播放的RDD,而是通過collectAsMap()獲得的地圖版本。

謝謝!

回答

2

Is it possible to broadcast an RDD in Python?

TL; DR

當你想到RDD 真的是,你會發現這是根本不可能的。 RDD中沒有任何內容可以播放。這太脆弱了(可以這麼說)。

RDD是一個數據結構,描述了對某些數據集的分佈式計算。通過RDD的功能,您可以描述什麼以及如何計算。這是一個抽象的實體。

引用的RDD的scaladoc:

Represents an immutable, partitioned collection of elements that can be operated on in parallel

Internally, each RDD is characterized by five main properties:

  • A list of partitions

  • A function for computing each split

  • A list of dependencies on other RDDs

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

有沒有什麼你可以爲(報價SparkContext.broadcast方法的scaladoc)播出:

broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T] Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

你只能播放一個真正的價值,而是一個RDD是隻有當執行者處理其數據時纔可用的值的容器

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

而且在同一文檔中後來:

This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

你可以然而collect數據集的RDD 持有和如下播放它:

my_list = ["a", "d", "c", "b"] 
my_list_rdd = sc.parallelize(my_list) 
sc.broadcast(my_list_rdd.collect) // <-- collect the dataset 

在「收集數據集「步驟,數據集將離開RDD空間併成爲本地可用的集合,這是一個可以進行廣播的Python值。

+1

很好的答案,但對於使用大型RDD收集警告,特別是針對新用戶的警告總是必不可少的。 – eliasah

1

您無法播放RDD。您向處理RDD時多次使用的所有執行程序節點廣播值。所以在你的代碼中,你應該在廣播之前收集你的RDD。 collectRDD轉換爲可以毫無問題地廣播的本地python對象。

sc.broadcast(my_list_rdd.collect()) 

當您廣播一個值時,該值被序列化並通過網絡發送到所有執行程序節點。您的my_list_rdd只是對分佈在多個節點上的RDD的引用。序列化此引用並將此引用廣播到所有工作節點並不意味着工作節點中的任何內容。所以你應該收集你的RDD的值並且廣播這個值。在星火廣播

的更多信息,可以發現here

注:如果RDD太大,應用程序可能會遇到一個內存溢出錯誤。 collect方法將通常不夠大的驅動程序內存中的所有數據全部提取出來。

+1

@eliasah完成。添加警告 –