2017-08-08 154 views
1

我試圖建立一個通用的方法來計算許多稀疏向量(長度爲250k的100k向量)的距離矩陣。在我的例子中,數據以scipy csr矩陣表示。這是我在做什麼:pyspark計算稀疏向量的距離矩陣

首先,我定義的方法來改造社會責任行pyspark SparseVectors:

def csr_to_sparse_vector(row): 
    return SparseVector(row.shape[1], sorted(row.indices), row.data) 

現在我行轉變爲載體,並將其保存到一個列表,我然後供給到SparkContext:

sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample] 
rdd = sc.parallelize(sparse_vectors) 

在我使用笛卡爾函數來建立所有對(類似於此信息:Pyspark calculate custom distance between all vectors in a RDD)下一步

在這個實驗我想TJE Jaccard相似使用進行相應的定義:

def jacc_sim(pair): 
    dot_product = pair[0].dot(pair[1]) 
    try: 
     sim = dot_product/(pair[0].numNonzeros() + pair[1].numNonzeros()) 
    except ZeroDivisionError: 
     return 0.0 
    return sim 

現在我應該只映射功能,並收集結果:

distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect() 

我在一個小樣本運行此代碼只有100個文件,本地機器和180個節點的羣集。任務需要永久並最終崩潰:https://pastebin.com/UwLUXvUZ

任何建議可能是錯誤的?如果距離度量是對稱sim(x,y)== sim(y,x),我們只需要矩陣的上三角形。我發現,通過過濾(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`)解決了這個問題後:

rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1]) 

但是,這並不爲SparseVectors名單工作。

回答

1

問題是導致將我的數據分割爲1000個分區的配置錯誤。解決的辦法是簡單地告訴火花,他應該明確多少分區創建(如10):

rdd = sc.parallelize(sparse_vectors, 10) 

而且我伸出稀疏向量列表與枚舉,這樣我就能過濾掉對它們不屬於上面的三角矩陣:

的屬於相似的功能如下:

def jacc_sim(pair): 
    id_0 = pair[0][0] 
    vec_0 = pair[0][1] 
    id_1 = pair[1][0] 
    vec_1 = pair[1][1] 
    dot_product = vec_0.dot(vec_1) 
    try: 
     sim = dot_product/(vec_0.numNonzeros() + vec_1.numNonzeros()) 
     if sim > 0: 
      return (id_0, id_1, sim) 
    except ZeroDivisionError: 
     pass 
    return None 

這個工作對我非常好,我希望別人WIL我覺得它也很有用!

0

它是有問題的列表,還是SparseVectors包含列表?一個想法是嘗試將SparseVectors轉換爲DenseVectors,這是我在這裏找到的一個建議(Convert Sparse Vector to Dense Vector in Pyspark)。計算結果沒有什麼不同,只是Spark如何處理它。

+0

Hej @MisterJT,謝謝你花時間。我的火花配置有問題,導致了崩潰。 – nadre

+0

@nadre,很高興你找到它。是特定於火花庫還是機器的配置。 – MisterJT