鑑於1條十億記錄包含下列信息:如何用Spark找到最近鄰居的10億條記錄?
ID x1 x2 x3 ... x100
1 0.1 0.12 1.3 ... -2.00
2 -1 1.2 2 ... 3
...
對於每個ID以上,我想找到的前10最接近的ID,基於它們的矢量的歐幾里德距離(X1,X2,...,X100) 。
什麼是計算這個最好的方法?
鑑於1條十億記錄包含下列信息:如何用Spark找到最近鄰居的10億條記錄?
ID x1 x2 x3 ... x100
1 0.1 0.12 1.3 ... -2.00
2 -1 1.2 2 ... 3
...
對於每個ID以上,我想找到的前10最接近的ID,基於它們的矢量的歐幾里德距離(X1,X2,...,X100) 。
什麼是計算這個最好的方法?
對所有記錄執行所有記錄的蠻力比較是一場失敗的戰鬥。我的建議是去現成的實現k-Nearest Neighbor算法,例如scikit-learn
提供的算法,然後廣播得到的索引和距離數組,然後繼續。
步驟在這種情況下將是:
1-矢量化的特徵布萊斯建議,讓你的向量化方法作爲特徵與儘可能多的元件返回浮點值的列表(或numpy的陣列)
2 - 適合你的scikit學習神經網絡對您的數據:
nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)
3-運行在您的量化數據受過訓練的算法(訓練和查詢數據是一樣的,你的情況)
distances, indices = nbrs.kneighbors(qpa)
步驟2和步驟3將在您的pyspark節點上運行,並且在此情況下不可並行化。您需要在此節點上擁有足夠的內存。在我的情況下,有150萬條記錄和4個功能,花了一兩秒鐘。
直到我們得到一個很好的火花NN的實現,我想我們將不得不堅持這些解決方法。如果你寧願喜歡嘗試新的東西,然後再爲http://spark-packages.org/package/saurfang/spark-knn
實際上,您的答案*中的第3步是可並行化的:sklearn的k-NN kneighbors()方法可以使用Spark進行分發!我已經發布如何在這裏:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/ – xenocyon
謝謝你的頭! – architectonic
您沒有提供很多的細節,但我想借此對這個問題的一般方法是:
{id_pair: [1,5], distance: 123}
你已經確定pyspark和我一般使用斯卡拉做這類工作的每個記錄的10個近鄰,但每個步驟的一些僞代碼可能如下所示:
# 1. vectorize the features
def vectorize_raw_data(record)
arr_of_features = record[1..99]
LabeledPoint(record[0] , arr_of_features)
# 2,3 + 4 map over each record for comparison
broadcast_var = []
def calc_distance(record, comparison)
# here you want to keep a broadcast variable with a list or dictionary of
# already compared IDs and break if the key pair already exists
# then, calc the euclidean distance by mapping over the features of
# the record and subtracting the values then squaring the result, keeping
# a running sum of those squares and square rooting that sum
return {"id_pair" : [1,5], "distance" : 123}
for record in allRecords:
for comparison in allRecords:
broadcast_var.append(calc_distance(record, comparison))
# 5. map for 10 closest neighbors
def closest_neighbors(record, n=10)
broadcast_var.filter(x => x.id_pair.include?(record.id)).takeOrdered(n, distance)
psuedocode是可怕的,但我認爲它傳達了意圖。因爲您將所有記錄與所有其他記錄進行比較,因此將會進行很多混洗和排序。恕我直言,你希望將密鑰對/距離存儲在中心位置(例如廣播變量,儘管這很危險),以減少您執行的總歐幾里得距離計算。
碰巧,我有一個解決的辦法,涉及星火結合sklearn:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/
它的要點是:
你嘗試過什麼?我們要求您向我們展示您到目前爲止所嘗試的內容,當您遇到困難或不明白錯誤並且文檔無法幫助時,我們將在此嘗試。此外,包含便於其他用戶複製並粘貼到自己的環境中的示例數據非常重要,因此他們可以在自己的環境中進行跟蹤。 –