2015-11-15 69 views
1

我是Spark的新手。我試圖實現tf-idf。我需要計算每個文檔中每個單詞出現的次數以及每個文檔中的單詞總數。火花,關於reduceByKey的小問題

我想減少和可能的另一個操作,但我不知道如何。 這裏是我的輸入:

對的形式是(documentName , (word, wordCount))前。

("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)), 
    ("doc2",("have", 5)) 

鍵是文檔和值是單詞,該單詞在該文檔中出現多少次。我想計算每個文檔中的總詞數並可能計算該詞的百分比。

輸出我想:

("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)), 
    ("doc2", (("a", 5),10)), ("doc2", (("have", 5),10)) 

我得到

corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1])) 

起點的效果:

collect_of_docs = [(doc1,text1), (doc2, text2),....] 

def count_words(x): 
    l = [] 
    words = x[1].split() 
    for w in words: 
     l.append(((w, x[0]), 1)) 
    return l 

sc = SparkContext() 
corpus = sc.parallelize(collect_of_docs) 
input = (corpus 
    .flatMap(count_words) 
    .reduceByKey(add) 
    .map(lambda ((x,y), z) : (y, (x,z)))) 

如果可能,我想只作一個減少一個棘手的操作操作員也許。任何幫助表示讚賞:)在此先感謝。

回答

1

一般來說,flatMap只是爲了稍後收集您的數據沒有意義。我假設你的數據看起來或多或少是這樣的:

collect_of_docs = sc.parallelize([ 
    (1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."), 
    (2, "Mauris magna sem, vehicula sed dictum finibus, posuere id ipsum."), 
    (3, "Duis eleifend molestie dolor, quis fringilla eros facilisis ac.")]) 

首先,我們將使用一個基本的正則表達式需要一些助手和Counter

from __future__ import division # If for some reason you use Python 2.x 
import re 
from collections import Counter 

def count_words(doc, pattern=re.compile("\w+")): 
    """Given a tuple (doc_id, text) 
    return a tuple (doc_id, tokens_count 

    >>> count_words((1, "Foo bar bar.")) 
    (1, Counter({'Foo': 1, 'bar': 2})) 
    """ 
    (doc_id, text) = doc 
    return (doc_id, Counter(pattern.findall(text))) 

def compute_tf(cnt): 
    """Convert term counter to term frequency 

    >>> compute_tf(Counter({'Foo': 1, 'bar': 2})) 
    {'Foo': 0.3333333333333333, 'bar': 0.6666666666666666} 
    """ 
    n = sum(cnt.values()) 
    return {k: v/n for (k, v) in cnt.items()} 

和最終結果:

tfs = (collect_of_docs 
    .map(count_words) 
    .mapValues(compute_tf)) 

tfs.sortByKey().first() 

## (1, 
## {'Lorem': 0.125, 
## 'adipiscing': 0.125, 
## 'amet': 0.125, 
## 'consectetur': 0.125, 
## 'dolor': 0.125, 
## 'elit': 0.125, 
## 'ipsum': 0.125, 
## 'sit': 0.125}) 

使用上述文件頻率可計算如下:

from operator import add 

dfs = (tfs 
    .values() 
    .flatMap(lambda kv: ((k, 1) for k in kv.keys())) 
    .reduceByKey(add)) 

dfs.sortBy(lambda x: -x[1]).take(5) 

## [('ipsum', 2), 
## ('dolor', 2), 
## ('consectetur', 1), 
## ('finibus', 1), 
## ('fringilla', 1)] 
+0

是的,這就是我想要的:))我想出於好奇而多了一件事。假設我想爲每個單詞計算tf-idf分數,並以doc1 word1 score1 word2 score2的形式展示它們; doc2 word1 score1 word2 score2 word3 score3; ..........這裏最有效的方法是什麼?我想我只是迭代tfs鍵和相應的單詞,並從dfs中查找單詞idf。你會怎麼做? – dogacanb

+0

這取決於。如果唯一條目的數量相對較少,那麼您可以收集dfs,以dict形式廣播,並通過tfs收集flatMap。否則,我會flatMap tfs到'(term,(doc_id,freq))')並且加入'dfs'。 – zero323

+1

即使看起來不一樣,它也有很大的不同。使用廣播變量不需要洗牌。所以這是一個本地操作。如果dfs很大,那麼廣播就成了一個限制因素,而混洗/散列連接就成了一個更好的解決方案。 – zero323