2017-06-03 33 views
1

什麼是可以傳遞給SparkContext.parallelize來創建RDD的元素的限制?更具體地說,如果我使用Python創建自定義類,那麼需要實現哪些方法才能確保它在RDD中正常工作?我假設它需要實施__eq____hash__並且可以分揀。還有什麼?有關文件的鏈接將不勝感激。我無法找到任何地方。什麼樣的對象可以是Spark RDD中的元素?

回答

1

嚴格地說,唯一的硬性要求是類是可序列化的(可挑選的),儘管對於生命週期僅限於單個任務(既不混洗也不收集/並行化)的對象來說不是必需的。

一致__hash____eq__僅需要如果類將(在byKey操作作爲密鑰)被用作混洗鍵,直接或間接地(例如,用於distinctcache)。

此外,類定義必須可以在每個工作節點上導入,因此模塊必須已經存在於PYTHONPATHpyFiles中。如果類依賴於本地依賴關係,那麼它們也必須存在於每個工作節點上。

最後,對於排序類型,必須使用標準Python語義進行定製。

總結:

  • 無特殊要求,除了被導入的:

    class Foo: 
        ... 
    
    # objects are used locally inside a single task 
    rdd.map(lambda i: Foo(i)).map(lambda foo: foo.get(i)) 
    
  • 必須是可序列化:

    # Has to be pickled to be distributed 
    sc.parallelize([Foo(1), Foo(2)]) 
    
    # Has to be pickled to be persisted 
    sc.range(10).map(lambda i: Foo(i)).cache() 
    
    # Has to be pickled to be fetched to the driver 
    sc.range(10).map(lambda i: Foo(i)).collect() # take, first, etc. 
    
  • 必須是Hashable

    # Explicitly used as a shuffle key 
    sc.range(10).map(lambda i: (Foo(i), 1)).reduceByKey(add) # *byKey 
    
    # Implicitly used as a shuffle kye 
    sc.range(10).map(lambda i: Foo(i)).distinct # subtract, etc. 
    

此外,所有通過閉包傳遞的變量都必須是可序列化的。

+0

非常好,謝謝! –