2015-03-30 72 views
1

以下程序在zip步驟中失敗。如何在pySpark中分離後壓縮

x = sc.parallelize([1, 2, 3, 1, 2, 3]) 
y = sc.parallelize([1, 2, 3]) 
z = x.distinct() 
print x.zip(y).collect() 

產生的錯誤取決於是否指定了多個分區。

我明白

兩個RDDS [必須]具有相同數目的分區和相同數量的每個分區中的元素。

解決此限制的最佳方法是什麼?

我一直在用下面的代碼執行操作,但我希望能找到更有效率的東西。

def safe_zip(left, right): 
    ix_left = left.zipWithIndex().map(lambda row: (row[1], row[0])) 
    ix_right = right.zipWithIndex().map(lambda row: (row[1], row[0])) 
    return ix_left.join(ix_right).sortByKey().values() 

回答

0

我認爲這將通過使用笛卡爾()在您的RDD來實現

import pyspark 
x = sc.parallelize([1, 2, 3, 1, 2, 3]) 
y = sc.parallelize([1, 2, 3]) 
x.distinct().cartesian(y.distinct()).collect() 
相關問題