1
我是Spark的新手。我試圖實現tf-idf。我需要計算每個文檔中每個單詞出現的次數以及每個文檔中的單詞總數。火花,關於reduceByKey的小問題
我想減少和可能的另一個操作,但我不知道如何。 這裏是我的輸入:
對的形式是(documentName , (word, wordCount))
前。
("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)),
("doc2",("have", 5))
鍵是文檔和值是單詞,該單詞在該文檔中出現多少次。我想計算每個文檔中的總詞數並可能計算該詞的百分比。
輸出我想:
("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)),
("doc2", (("a", 5),10)), ("doc2", (("have", 5),10))
我得到
corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1]))
起點的效果:
collect_of_docs = [(doc1,text1), (doc2, text2),....]
def count_words(x):
l = []
words = x[1].split()
for w in words:
l.append(((w, x[0]), 1))
return l
sc = SparkContext()
corpus = sc.parallelize(collect_of_docs)
input = (corpus
.flatMap(count_words)
.reduceByKey(add)
.map(lambda ((x,y), z) : (y, (x,z))))
如果可能,我想只作一個減少一個棘手的操作操作員也許。任何幫助表示讚賞:)在此先感謝。
是的,這就是我想要的:))我想出於好奇而多了一件事。假設我想爲每個單詞計算tf-idf分數,並以doc1 word1 score1 word2 score2的形式展示它們; doc2 word1 score1 word2 score2 word3 score3; ..........這裏最有效的方法是什麼?我想我只是迭代tfs鍵和相應的單詞,並從dfs中查找單詞idf。你會怎麼做? – dogacanb
這取決於。如果唯一條目的數量相對較少,那麼您可以收集dfs,以dict形式廣播,並通過tfs收集flatMap。否則,我會flatMap tfs到'(term,(doc_id,freq))')並且加入'dfs'。 – zero323
即使看起來不一樣,它也有很大的不同。使用廣播變量不需要洗牌。所以這是一個本地操作。如果dfs很大,那麼廣播就成了一個限制因素,而混洗/散列連接就成了一個更好的解決方案。 – zero323