2014-04-21 41 views
1

我想了解mrjob更好mrjob:示例如何自動知道如何在文本文件中查找行?

from mrjob.job import MRJob 
class MRWordFrequencyCount(MRJob): 

    def mapper(self, _, line): 
     yield "chars", len(line) 
     yield "words", len(line.split()) 
     yield "lines", 1 

    def reducer(self, key, values): 
     yield key, sum(values) 
if __name__ == '__main__': 
    MRWordFrequencyCount.run() 

我通過

$ python word_count.py my_file.txt 

運行的例子,它按預期工作,但我不明白它是如何自動知道它會讀取一個文本文件並按每行分割。我不確定_會做什麼。

據我所知,mapper()爲每一行生成三個鍵/值對是否正確?如果我想處理文件夾中的每個文件,該怎麼辦?

而且reducer()自動知道如何將每個鍵的值加起來?

如果我想通過map reduce運行單元測試,那麼mapper和reducer會是什麼樣子?它甚至有必要嗎?

回答

3

映射器方法接收已經從輸入文本解析出來的鍵值對。 mrjob使用Hadoop流媒體,每個輸入文本被新的行字符分隔,然後根據使用的輸入協議將每行分割爲鍵值對。這是框架爲你處理的事情,所以你不必做任何繁重的工作;你可以假設你會得到正確的鑰匙和價值。

但是,您確實需要指定指定哪種輸入文本文件。例如,如果鍵和/或值不是純文本(如原始問題),而是序列化的JSON,則使用JSONProtocol/JSONValueProtocol等,而不是默認的RawValueProtocol。

對於初始映射器,每一行讀入值(通過RawValueProtocol),所以這就是爲什麼你沒有收到密鑰。使用_僅僅是一個未使用的虛擬變量的Python約定。 (但是,_實際上是一個Python變量的有效名稱,你可以這樣做:a = 3; _ = 2; b = a + _。褻瀆,不是嗎?)

mrjob可以接受多個輸入文件。你可以做,例如

$ python wordcount.py text1.txt text2.txt 

如果你想所有的文本文件作爲輸入到mrjob工作,你可以做這樣的事情

$ python wordcount.py inputdir/*.txt 

或者只是

$ python wordcount.py inputdir 

和所有的選擇的文件被用作輸入。

Reducer接收到的是一個鍵和與該鍵關聯的所有值的迭代器。因此,如果您舉例,reducer方法中的變量values是一個迭代器。如果你想對所有值做些什麼,你需要實際遍歷所有的值。在問題的具體示例中,內置函數sum可以將迭代器作爲參數,這就是爲什麼您可以一次完成它。但它實際上與sum([value for value in values])類似。

我其實不知道你會如何單元測試mrjob腳本。在生產運行之前,我通常只對一小部分測試數據進行測試。

+0

所以如果有人解析JSON文件,它應該看起來像'{ '鍵1': 'VALUE1'} { 'KEY2': '值'}'這裏爲txt文件只是有'值1 \ n值2 \ n值3 \ n'? – KJW

+0

使用mrjob實現,鍵和值都可以是JSON對象。實際上,我認爲它是通過JSONProtocol在內部傳遞的。並且鍵值對在序列化時由製表符分隔。因此,對於JSON協議,例如,一行看起來像''鍵'\ t {「somelist」:[1,2,3]} \ n'。這實際上就是爲什麼你的輸出,鍵是雙引號的原因,因爲它在JSON中以字符串形式輸出。對於初始輸入,如果使用JSONValueProtocol,那麼您希望輸入文件看起來像「{」L1「:1} \ n {」L2「:2} \ n {」L3「:3}'等等。每行只包含該協議的值。 –

+0

我其實建議你閱讀關於輸入協議的mrjob文檔。這是非常重要且有用的,但如果您不熟悉如何在內部處理輸入/輸出/鍵值,可能會引起混淆。 –

2

我對mrjob瞭解不多,所以我會做一些假設。首先,_表示忽略密鑰(在Google搜索之後進行驗證)。其次,我假設它可以在逗號分隔的文件列表或目錄上工作。接下來,這段代碼沒有安裝程序,可能是因爲這些是默認的方法名稱。我確定你是否給你的映射器或reducer命名了一些不同的mrjob不能自動拾取的東西。

我發現了一些例子here

0
from mrjob.job import MRJob 

class MRRatingCounter(MRJob): 
    def mapper(self, key, line): 


     (userID, movieID, rating, timestamp) = line.split('\t') 
     yield rating, 1 

    def reducer(self, rating, occurences): 
     yield rating, sum(occurences) 

if __name__ == '__main__': 
    MRRatingCounter.run() 

所以,請大家指正基於以上論述的:

請糾正我,如果我錯了,所以在這種情況下,關鍵是採取輸入文件的值,在這種情況下,我可以在我的腦海裏讀取它像這樣:

def mapper(self {this is the object/instance},key {在這種情況下,輸入文本文件的文件的名稱,並正確的我ml-100k/u.data,行{這是試圖從數據文件每次傳遞給mapper(),)

另一個從Udemy我在哪裏努力學習的代碼是由問原來的問題: 類MRFriendsByAge(MRJob):

def mapper(self, _, line): 
    (ID, name, age, numFriends) = line.split(',') 
    yield age, float(numFriends) 

def reducer(self, age, numFriends): 
    total = 0 
    numElements = 0 
    for x in numFriends: 
     total += x 
     numElements += 1 

    yield age, total/numElements 

如果 == '主要': MRFriendsByAge.run()

Found a MR job book and am trying to see if its make sense but i am still struggling

相關問題