2012-09-24 51 views
2

只需簡單地通過發送連接字段作爲簡化鍵,便可輕鬆地通過單個鍵連接數據集。 但是,通過幾個鍵來連接記錄,其中至少有一個相同的記錄對我來說並不那麼容易。如何實現OR加入hadoop(燙傷/級聯)

比如我的日誌,我想通過用戶參數組他們,我想(ip地址,的sessionId,visitorCockies)將這些

所以LOG1應的log 2進行分組,如果log1.ip ==的log 2。 ip或者log1.session = log2.session或者log1.cockie = log2.coockie。也許有可能創建組合密鑰或一些概率方法,如minHash ...

這可能嗎?

回答

0

問題是MapReduce連接通常是通過給某些字段上匹配的記錄使用相同的reduce鍵來實現的,以便它們被髮送到同一個reducer。因此,任何解決這個問題的方法都會有些破綻,但這是可能的...

下面是我會推薦的:對於每個輸入記錄,生成三個副本,每個副本都帶有一個新的「鍵」字段以來自它的字段爲前綴。因此,例如,說你有以下輸入:

(ip=1.2.3.4, session=ABC, cookie=123) 
(ip=3.4.5.6, session=DEF, cookie=456) 

,那麼你會產生

(ip=1.2.3.4, session=ABC, cookie=123, key=ip_1.2.3.4) 
(ip=1.2.3.4, session=ABC, cookie=123, key=session_ABC) 
(ip=1.2.3.4, session=ABC, cookie=123, key=cookie_123) 
(ip=3.4.5.6, session=DEF, cookie=456, key=ip_3.4.5.6) 
(ip=3.4.5.6, session=DEF, cookie=456, key=session_DEF) 
(ip=3.4.5.6, session=DEF, cookie=456, key=cookie_456) 

然後你可以簡單地在這個新的領域組。

我並不太熟悉燙傷/級聯(儘管我一直想要了解更多關於它的內容),但這肯定會符合Hadoop中通常如何進行連接。

+0

這樣,我會得到3個不同的重疊羣(每個鍵repectivelly相等),所以我需要一種方法將它們合併到單個組 – yura

+0

@yura如果左記錄可以與多個正確的記錄,然後加入通常是加入會讓他們沒有參與(如此重複的價值)。原因是合併會導致未定義的元組大小(表格寬度),您可能會得到1,2或3個正確的記錄。因此這個解決方案是正確的,(但缺乏細節和基本的實現;)。 – samthebest

0

按照上面的Joe所述創建單獨的連接後,您需要刪除重複項。如果您在「OR-join」中使用的所有字段中的值相同,則數據中的兩個元組是重複的。因此,如果您之後對代表所有相關字段的鍵進行自然連接,則會將所有重複項組合在一起。因此,你可以用一個單獨的元組來替換它們。我們來看一個例子:假設你已經有字段(A,B,C,D)的元組,並且你感興趣的字段是A,B和C.你首先要做equi-join分別在A,B和C上。對於每一個,你都會自己加入初始元組流。用(A0,B0,C0,D0)表示第一個流,用(A1,B1,C1,D1)表示第二個流。結果將是元組(A0,B0,C0,D0,A1,B1,C1,D1)。對於每個元組,您都可以創建一個元組(A0A1B0B1C0C1,A0,B0,C0,D0,A1,B1,C1,D1),這樣所有的副本都將在後續的縮減器中組合在一起。對於每個組,只返回其中一個包含的元組。

0

你能描述更多關於「通過幾個鍵加入記錄」嗎?

如果您知道工作流中可以連接特定鍵的點,最好的方法可能是定義一個具有多個連接的流,而不是嘗試操縱複雜的數據結構來解析N個鍵一步。

這裏是一個示例應用程序,它展示瞭如何處理不同類型的連接在級聯:https://github.com/Cascading/CoPA

0

級聯,我結束了創建,如果內部的任何條件的輸出或是真實的,其檢查過濾器。級聯濾波器輸出可以選擇使用的True/False值。

0

提示:使用類型別名,讓您的燙碼好的閱讀

注意0:該解決方案是特別好的,因爲它總是會僅有1 mapred工作,即使有多個鍵,加入。

注1:假設每個管道沒有重複的鍵,否則你必須使'鍵也有一個索引,它來自哪個日誌,而mapTo將是一個flatMapTo並且有點複雜。

注意2:爲了簡單起見,這將放棄連接字段,讓他們你需要一個很大的醜陋元組(ip1,ip2,session1,session2,...等)。如果你真的想要,我可以寫出一個例子來保持它們。

注3:如果你真的想合併重複的值,你可以用每個logEntry1和logEntry2的GROUPBY按照此,產生logEntryList,然後貓(如在評論中提及這是不正常的加入)。這將創建2個更多的mapred作業。

type String2 = (String, String) 
type String3 = (String, String, String) 

def addKey(log: Pipe): Pipe = log.flatMap[String3, String](('ip, 'session, 'cookie) -> 'key)(
    _.productIterator.toList.zipWithIndex.map { 
    case (key: String, index: Int) => index.toString + key 
    } 
) 

(addKey(log1) ++ addKey(log2)).groupBy('key)(_.toList[String]('logEntry -> 'group)) 
.mapTo[Iterable[String], String2]('group -> ('logEntry1, 'logEntry2))(list => (list.head, list.last))