2012-09-12 40 views
4

假設我有兩個鍵值數據集 - 數據集A和B,我們稱之爲它們。我想用集合B中的數據更新集合A中的所有數據,其中兩個數據與鍵匹配。使用Hadoop通過兩個數據集進行映射

因爲我正在處理大量的數據,所以我使用Hadoop來處理MapReduce。我擔心的是,爲了在A和B之間進行這種關鍵匹配,我需要將所有集合A(大量數據)加載到每個映射器實例的內存中。這似乎相當低效。

會不會有推薦的方法來做到這一點,每次都不需要重複加載工作?

一些pseudcode澄清目前我在做什麼:

Load in Data Set A # This seems like the expensive step to always be doing 
Foreach key/value in Data Set B: 
    If key is in Data Set A: 
     Update Data Seta A 

回答

3

根據該文件,MapReduce框架包括the following steps

  1. 地圖
  2. 排序/分區
  3. 結合(可選)
  4. 減少

您已經描述了一種執行連接的方法:將每個映射器中的所有集合A加載到內存中。你是正確的,這是低效的。

請注意,如果兩個集合都按鍵排序和分區,則可以將大連接分割成任意多個較小的連接。 MapReduce在上面的步驟(2)中通過鍵對每個映射器的輸出進行排序。 Sorted Map輸出然後按鍵分區,以便每個Reducer創建一個分區。對於每個唯一鍵,Reducer將接收來自Set A和Set B的所有值。

要完成加入,Reducer只需要輸出密鑰和Set B中的更新值(如果存在);否則,從Set A輸出密鑰和原始值。要區分來自Set A和Set B的值,請嘗試在Mapper的輸出值上設置一個標誌。

+0

如果我理解正確,你說的Hadoop'-input'應該是-both-設置A和B(帶有一個標誌來區分它們之間的值),然後在縮小步驟中,將被分組在一起,這樣就可以加入它們。那是對的嗎? – babonk

+0

對,你需要將A和B都輸入到你的映射器中,並確保你輸出了一個標記,以便減速器能夠區分這兩者。我還假設你正在編寫你自己的'Map'(實現'mapred.Mapper')和'Reduce'(實現'mapred.Reducer')類。像@ joe-k在一個單獨的答案中說,Pig,Hive和Cascading都是合法的解決方案,它將爲您生成此代碼並在Hadoop上運行。 – Shahin

+1

您可以使用布隆過濾器進行小型優化,如果集合A比集合B大很多,並且存在與集合B不匹配並且集合B不適合內存的記錄,那麼這將很有用。創建一個MapReduce作業,它在Set B上訓練Bloom Filter,而不是在最終的MapReduce Job中執行所有建議的操作,但是在使用Bloom Filter的Mapper過濾器Set A中,這將減少Set A中的溢出記錄的數量有匹配的B組記錄,因此你的工作可能會運行得更快。 – alexeipab

2

Cloudera的This video tutorial給出了一個關於如何通過MapReduce從大約12分鐘開始進行大規模加入的很好的描述。
以下是他爲將文件B中的記錄連接到關鍵字K上的文件A中的記錄以及僞代碼而規定的基本步驟。如果這裏有任何不清楚的地方,我建議觀看視頻,因爲他比我更擅長解釋視頻。

在您的映射:

K from file A: 
    tag K to identify as Primary Key 
    emit <K, value of K> 

K from file B: 
    tag K to identify as Foreign Key 
    emit <K, record> 

寫分揀機和石斑魚將忽略PK/FK標籤,讓您的記錄,無論他們是否是PK記錄或FK發送到相同的減速記錄並分組在一起。

寫一個比較器,比較PK和FK鍵並首先發送PK。

該步驟的結果將是所有具有相同密鑰的記錄將被髮送到同一個Reducer並處於相同的一組值中以減少。帶有PK標記的記錄將是第一個,然後是來自需要被連接的B的所有記錄。現在,減速機:

value_of_PK = values[0] // First value is the value of your primary key 
for value in values[1:]: 
    value.replace(FK,value_of_PK) // Replace the foreign key with the key's value 
    emit <key, value> 

這樣做的結果將是文件B,由K的文件A.值替代k的所有事件還可以擴展,以實現一個完整的內部連接,或把兩個文件全部寫出來用於直接數據庫存儲,但一旦你得到這個工作,這些都是非常微不足道的修改。

+0

不錯。將它標記爲A和B中的哈希值而不是密鑰本身不起作用嗎?然後,我不需要編寫自己的Sorter和Grouper,並且可以在Reduce步驟中循環訪問該組,以查看它們是否存在 – babonk

+0

@babonk對不起,不知道你的意思。你是指在散列階段標記值? – HypnoticSheep

+0

你似乎建議標記K,而不是K的值。我只是說它可能更容易標記值,所以我不需要寫我自己的分類器/石斑魚 – babonk

3

到目前爲止發佈的所有答案都是正確的 - 這應該是一個Reduce方面的加入...但是沒有必要重新發明輪子!你有沒有考慮過PigHiveCascading?他們都加入了內置的功能,並且相當優化。

+0

哇,很酷。現在我要堅持使用Hadoop,但我可能會學習這些框架中的一個 – babonk