我跟着這篇文章並行運行KMeans。我在EMR上使用了Python 2.7和Spark 2.0.2。從不同的工藝提交Pyspark平行ml.KMeans覆蓋對方的K
How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
正如郵報,工作不應該相互影響。
在給定的Spark應用程序(SparkContext實例)中,如果多個並行作業是從單獨的線程提交的,它們可以同時運行。通過「作業」,在本節中,我們的意思是Spark操作(例如保存,收集)以及需要運行以評估該操作的任何任務。斯巴克的調度是線程安全的,並支持這種使用情況,使服務於多個請求(例如,對於多個用戶查詢)的應用程序。」 http://spark.apache.org/docs/latest/job-scheduling.html
然而,得到的模型的簇數K是從什麼是在通過不同。
代碼:
from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)
group_size = 30
n_groups = 20
n_samples= n_groups * group_size
n_features=2
n_centers=4
xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
x_groups.append(xs[i*group_size: (i+1)*group_size])
def do_kmean(xs):
data = []
for x in xs:
data.append((Vectors.dense(x.tolist()),))
df = spark.createDataFrame(data, ["features"])
num_clusters = random.randint(5,10)
kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(df)
return [num_clusters, kmeans.getK()]
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)
result = tpool.map(do_kmean, x_groups)
結果:(將輸入的K VS KMEANS什麼實際使用的)
[[5, 9],
[8, 9],
[6, 8],
[10, 9],
[7, 9],
[9, 9],
[7, 9],
[9, 9],
[5, 5],
[5, 9],
[9, 7],
[9, 9],
[5, 7],
[10, 5],
[7, 7],
[7, 7],
[6, 6],
[10, 10],
[10, 10],
[5, 5]]
看來Spark不是線程/進程安全的,並且正在訪問K的其他進程的副本。是否有任何Spark配置導致此問題,或者這是Spark錯誤?