1
我在Spark中實現了k-means算法。當我運行下面的代碼時,出現酸洗錯誤(如下所示)。如果我修改它並將所有內容放在循環之外,它將正確計算質心。訪問循環中的RDD時發生火花 - 酸洗錯誤
sc = SparkContext(appName="Document Similarity")
lines = sc.wholeTextFiles(sys.argv[1])
articles = lines.flatMap(lambda x: re.split(r' ',x[1]))
shingles = articles.flatMap(shingle_pairs.get_pairs)
sig_vecs = shingles.groupBy(lambda x: x[1]) \
.map(lambda x: sig_vector.create_vector(x, a, b, n, p))
centroids = k_means.init_centroids(sig_size, k)
for i in range(max_it):
# assign documents to closest cluster
docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))
# get count by key to use in mean calculation for new clusters
doc_count = docs.countByKey()
# recompute cluster centroids
reduced_docs = docs.reduceByKey(k_means.reducer)
centroids = reduced_docs.map(lambda x: k_means.mapper(x, doc_count))
誤差如下:
pickle.PicklingError: Could not serialize object: Exception:
It appears that you are attempting to broadcast an RDD or reference an
RDD from an action or transformation. RDD transformations and actions
can only be invoked by the driver, not inside of other transformations;
for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid
because the values transformation and count action cannot be performed
inside of the rdd1.map transformation. For more information, see SPARK-5063.