2017-10-18 158 views
1

我在Spark中實現了k-means算法。當我運行下面的代碼時,出現酸洗錯誤(如下所示)。如果我修改它並將所有內容放在循環之外,它將正確計算質心。訪問循環中的RDD時發生火花 - 酸洗錯誤

sc = SparkContext(appName="Document Similarity") 
lines = sc.wholeTextFiles(sys.argv[1]) 

articles = lines.flatMap(lambda x: re.split(r' ',x[1])) 

shingles = articles.flatMap(shingle_pairs.get_pairs) 

sig_vecs = shingles.groupBy(lambda x: x[1]) \ 
        .map(lambda x: sig_vector.create_vector(x, a, b, n, p)) 

centroids = k_means.init_centroids(sig_size, k) 

for i in range(max_it): 
    # assign documents to closest cluster 
    docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids)) 

    # get count by key to use in mean calculation for new clusters 
    doc_count = docs.countByKey() 

    # recompute cluster centroids 
    reduced_docs = docs.reduceByKey(k_means.reducer) 
    centroids = reduced_docs.map(lambda x: k_means.mapper(x, doc_count)) 

誤差如下:

pickle.PicklingError: Could not serialize object: Exception: 
It appears that you are attempting to broadcast an RDD or reference an 
RDD from an action or transformation. RDD transformations and actions 
can only be invoked by the driver, not inside of other transformations; 
for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid 
because the values transformation and count action cannot be performed 
inside of the rdd1.map transformation. For more information, see SPARK-5063. 

回答

1

SPARK-5063解釋 「火花不支持嵌套RDDS」。您在mapsig_vecsRDD)試圖訪問centroidsRDD):

docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids)) 

轉換centroids到本地集合(collect?),並調整classify_docs應該解決的問題。