2

鑑於1條十億記錄包含下列信息:如何用Spark找到最近鄰居的10億條記錄?

ID x1 x2 x3 ... x100 
    1 0.1 0.12 1.3 ... -2.00 
    2 -1 1.2 2 ... 3 
    ... 

對於每個ID以上,我想找到的前10最接近的ID,基於它們的矢量的歐幾里德距離(X1,X2,...,X100) 。

什麼是計算這個最好的方法?

+2

你嘗試過什麼?我們要求您向我們展示您到目前爲止所嘗試的內容,當您遇到困難或不明白錯誤並且文檔無法幫助時,我們將在此嘗試。此外,包含便於其他用戶複製並粘貼到自己的環境中的示例數據非常重要,因此他們可以在自己的環境中進行跟蹤。 –

回答

3

對所有記錄執行所有記錄的蠻力比較是一場失敗的戰鬥。我的建議是去現成的實現k-Nearest Neighbor算法,例如scikit-learn提供的算法,然後廣播得到的索引和距離數組,然後繼續。

步驟在這種情況下將是:

1-矢量化的特徵布萊斯建議,讓你的向量化方法作爲特徵與儘可能多的元件返回浮點值的列表(或numpy的陣列)

2 - 適合你的scikit學習神經網絡對您的數據:

nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data) 

3-運行在您的量化數據受過訓練的算法(訓練和查詢數據是一樣的,你的情況)

distances, indices = nbrs.kneighbors(qpa) 

步驟2和步驟3將在您的pyspark節點上運行,並且在此情況下不可並行化。您需要在此節點上擁有足夠的內存。在我的情況下,有150萬條記錄和4個功能,花了一兩秒鐘。

直到我們得到一個很好的火花NN的實現,我想我們將不得不堅持這些解決方法。如果你寧願喜歡嘗試新的東西,然後再爲http://spark-packages.org/package/saurfang/spark-knn

+3

實際上,您的答案*中的第3步是可並行化的:sklearn的k-NN kneighbors()方法可以使用Spark進行分發!我已經發布如何在這裏:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/ – xenocyon

+0

謝謝你的頭! – architectonic

1

您沒有提供很多的細節,但我想借此對這個問題的一般方法是:

  1. 轉換記錄到的數據結構類似像(ID,X1一LabeledPoint ..x100)作爲標籤和功能
  2. 映射每條記錄並將該記錄與所有其他記錄進行比較(此處有大量優化空間)
  3. 創建一些截斷邏輯,以便一旦開始比較ID = 5和ID = 1因爲您已經將ID = 1與ID = 5進行了比較,所以中斷計算
  4. 有些減少一步得到像{id_pair: [1,5], distance: 123}
  5. 另一個地圖一步一個數據結構來尋找

你已經確定pyspark和我一般使用斯卡拉做這類工作的每個記錄的10個近鄰,但每個步驟的一些僞代碼可能如下所示:

# 1. vectorize the features 
def vectorize_raw_data(record) 
    arr_of_features = record[1..99] 
    LabeledPoint(record[0] , arr_of_features) 

# 2,3 + 4 map over each record for comparison 
broadcast_var = [] 
def calc_distance(record, comparison) 
    # here you want to keep a broadcast variable with a list or dictionary of 
    # already compared IDs and break if the key pair already exists 
    # then, calc the euclidean distance by mapping over the features of 
    # the record and subtracting the values then squaring the result, keeping 
    # a running sum of those squares and square rooting that sum 
    return {"id_pair" : [1,5], "distance" : 123}  

for record in allRecords: 
    for comparison in allRecords: 
    broadcast_var.append(calc_distance(record, comparison)) 

# 5. map for 10 closest neighbors 

def closest_neighbors(record, n=10) 
    broadcast_var.filter(x => x.id_pair.include?(record.id)).takeOrdered(n, distance) 

psuedocode是可怕的,但我認爲它傳達了意圖。因爲您將所有記錄與所有其他記錄進行比較,因此將會進行很多混洗和排序。恕我直言,你希望將密鑰對/距離存儲在中心位置(例如廣播變量,儘管這很危險),以減少您執行的總歐幾里得距離計算。