今天,我問我同樣的情況,多處理模塊提供ThreadPool
,它爲您產生了一些線程,因此可以並行運行這些工作。實例化函數,然後創建池,然後在想要迭代的範圍上創建它。
在我的情況下,我計算了不同數量的中心(超參數調整)的這些WSSSE數字,以獲得「良好」的k均值聚類...就像它在MLSpark documentation中概述的那樣。如果沒有進一步的解釋,這裏有一些細胞從我IPython的工作表:
from pyspark.mllib.clustering import KMeans
import numpy as np
c_points被12dim陣列:
>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([-2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([ 7, -1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])]
在下面,每個i
我計算這一WSSSE值並返回它作爲一個元組:
def error(point, clusters):
center = clusters.centers[clusters.predict(point)]
return np.linalg.norm(point - center)
def calc_wssse(i):
clusters = KMeans.train(c_points, i, maxIterations=20,
runs=20, initializationMode="random")
WSSSE = c_points\
.map(lambda point: error(point, clusters))\
.reduce(lambda x, y: x + y)
return (i, WSSSE)
這裏開始有趣的部分:
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)
運行:
wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points
給出:
[(1, 195318509740785.66),
(2, 77539612257334.33),
(3, 78254073754531.1),
...
]
自從得到答案嗎?我試圖做同樣的事情,並認爲它實際上是不可能的,直到更好的鎖定被添加到'SparkContext's。 –
@MikeSukmanowsky你是什麼意思?這篇文檔沒有提到特定的Spark API,它似乎適用於所有這些API。使用任何API時運行的實際代碼是Scala代碼以及Java和Python的一些接口代碼。 – Dici
你能提供這個聲明來自哪裏的鏈接嗎? – Jon