2017-04-07 51 views
1

是否有可能加盟星火2個RDDS上的自定義功能? 我有兩個大字符串作爲關鍵的RDD。我希望他們不要採用經典的加入加入,但如自定義功能:加入自定義函數的兩個RDDS - SPARK

def my_func(a,b): 
    return Lev.distance(a,b) < 2 

result_rdd = rdd1.join(rdd2, my_func) 

如果這是不可能的,沒有任何替代方案,將繼續使用火花集羣的好處是什麼? 我寫了類似的東西,但是pyspark將無法分配我的小羣集上的工作。提前(和對不起我的英語,因爲我是意大利人)

def custom_join(rdd1, rdd2, my_func): 
    a = rdd1.sortByKey().collect() 
    b = rdd2.sortByKey().collect() 
    i = 0 
    j = 0 
    res = [] 
    while i < len(a) and j < len(b): 
     if my_func(a[i][0],b[j][0]): 
      res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))] 
      i+=1 
      j+=1 
     elif a[i][0] < b[j][0]: 
      i+=1 
     else: 
      j+=1 

    return sc.parallelize(res) 

感謝

回答

2

您可以使用笛卡爾,然後篩選根據條件。

from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate() 
sc = spark.sparkContext 
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2), ("b", 3)]) 

def customFunc(x): 
    # You may use any condition here 
    return x[0][0] ==x[1][0] 

print(x.join(y).collect()) # normal join 
# replicating join with cartesian 
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect()) 

輸出:

[('b', (4, 3)), ('a', (1, 2))] 
[('a', (1, 2)), ('b', (4, 3))] 
+0

謝謝,但我覺得比起加入笛卡兒積將是非常低效的。我正在處理一個包含大約2M條目的數據庫。 –

+0

是否可以使用數據框API? – Himaprasoon

+0

數據框是否與羣集計算兼容? –