2014-02-19 61 views
2

我正在尋找很長時間來解決我的問題,但幾乎沒有發現任何幫助。 希望你們中的一些人能給我一個小費。PIG FILTER與下一行的關係相同的關係

我有以下格式的關係的:用戶名,時間戳,IP

例如:

Harald 2014-02-18T16:14:49.503Z 123.123.123.123 
Harald 2014-02-18T16:14:51.503Z 123.123.123.123 
Harald 2014-02-18T16:14:55.503Z 321.321.321.321 

,我想看看,誰在不到5秒鐘改變了他的IP地址。所以第二和第三行應該很有趣。

我想通過用戶名組合關係,並且想要將實際行的時間戳與下一行進行比較。如果IP地址不相同,並且時間戳小於5秒,則應該在輸出處。

有人可以幫我解決這個問題嗎?

關於。


首先我想感謝你的時間。

但我實際上停留在Sessionize部分。

這是我的數據正在添加:

aoebcu 2014-02-19T14:23:17.503Z 220.61.65.25 
aoebcu 2014-02-19T14:23:14.503Z 222.117.144.19 
aoebcu 2014-02-19T14:23:14.503Z 222.117.144.19 
jekgru 2014-02-19T14:23:14.503Z 213.56.157.109 
zmembx 2014-02-19T14:23:12.503Z 199.188.198.91 
qhixcg 2014-02-19T14:23:11.503Z 203.40.104.119 

和我的代碼到現在看起來是這樣的:

hijack_Reduced = FOREACH finalLogs GENERATE ClientUserName, timestamp, OriginalClientIP; 
hijack_Filtered = FILTER hijack_Reduced BY OriginalClientIP != '-'; 

hijack_Sessionized = FOREACH (GROUP hijack_Filtered BY ClientUserName) { 
    views = ORDER hijack_Filtered BY timestamp; 
    GENERATE FLATTEN(Sessionize(views)) AS (ClientUserName,timestamp,OriginalClientIP,session_id); 
} 

但是當我運行該腳本,我得到了以下錯誤消息:

15:36:22錯誤 - org.apache.pig.tools.pigstats.SimplePigStats.setBackendException(542) |執行[POUserFunc(名稱: POUserFunc(datafu.pig.sessions.Sessionize)[bag] - scope-199 Operator Key:scope-199)children:null at []]時發生異常: java.lang。 IllegalArgumentException:格式無效:「aoebcu」

我已經嘗試了很多,但沒有任何工作。 你有想法嗎?

問候

+0

你需要編寫一個UDF。 –

+0

你可以看看使用esper – robthewolf

回答

1

雖然你可以寫一個UDF對於這一點,你實際上可以利用的UDF在Apache DataFu已經可以解決這個問題。

我的解決方案涉及將會話化應用於數據。基本上你會看到連續的事件併爲每個事件分配一個會話ID。如果兩個事件之間的時間超過了指定的時間(在您的情況下爲5秒),則下一個事件將獲得新的會話ID。否則,連續的事件會獲得相同的會話ID。一旦爲每個事件分配了其會話ID,其餘都很容易。我們按會話ID進行分組,並查找具有多個不同IP地址的會話。

我會穿過我的解決方案。

假設您有以下輸入數據。哈羅德和庫馬爾都改變了他們的IP地址。但哈羅德在5秒鐘內完成,而庫馬爾則沒有。所以我們腳本的輸出應該只是簡單的「哈羅德」。

Harold,2014-02-18T16:14:49.503Z,123.123.123.123 
    Harold,2014-02-18T16:14:51.503Z,123.123.123.123 
    Harold,2014-02-18T16:14:55.503Z,321.321.321.321 
    Kumar,2014-02-18T16:14:49.503Z,123.123.123.123 
    Kumar,2014-02-18T16:14:55.503Z,123.123.123.123 
    Kumar,2014-02-18T16:15:05.503Z,321.321.321.321 

加載數據

data = LOAD 'input' using PigStorage(',') 
     AS (user:chararray,time:chararray,ip:chararray); 

現在從DataFu定義一對夫婦的UDF。如前所述,UDF執行會話。 DistinctBy UDF將用於在每個會話中查找不同的IP地址。

define Sessionize datafu.pig.sessions.Sessionize('5s'); 

define DistinctBy datafu.pig.bags.DistinctBy('1'); 

按用戶分組數據,按時間排序並應用Sessonize UDF。請注意,時間戳必須是第一個字段,因爲這是Sessionize期望的。此UDF將會話ID附加到每個元組。

data = FOREACH data GENERATE time,user,ip; 

data_sessionized = FOREACH (GROUP data BY user) { 
    views = ORDER data BY time; 
    GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id); 
} 

既然數據已被會話化,我們可以按用戶和會話進行分組。我也被用戶分組,因爲我想把這個值吐出來。我們將這些事件傳遞給DistinctBy UDF。查看此UDF的文檔以獲取更詳細的說明。但基本上,我們將獲得儘可能多的元組,因爲每個會話有不同的IP地址。請注意,我已經從下面的關係中刪除了時間。這是因爲1)它不是必需的,2)DataFu 1.2.0中的DistinctBy在處理包含短劃線的字段時存在一個錯誤,就像時間字段一樣。

data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id; 

data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE 
    group.user as user, 
    SIZE(DistinctBy(data_sessionized)) as distinctIpCount; 

現在選擇所有具有多個不同IP地址的會話,並返回這些會話的不同用戶。

data_sessionized = FILTER data_sessionized BY distinctIpCount > 1; 

data_sessionized = FOREACH data_sessionized GENERATE user; 

data_sessionized = DISTINCT data_sessionized; 

這只是生產:

Harold 

以下是完整的源代碼,您應該能夠直接粘貼到DataFu單元測試和運行:

/** 
    define Sessionize datafu.pig.sessions.Sessionize('5s'); 

    define DistinctBy datafu.pig.bags.DistinctBy('1'); -- distinct by ip 

    data = LOAD 'input' using PigStorage(',') AS (user:chararray,time:chararray,ip:chararray); 

    data = FOREACH data GENERATE time,user,ip; 

    data_sessionized = FOREACH (GROUP data BY user) { 
    views = ORDER data BY time; 
    GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id); 
    } 

    data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id; 

    data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE 
    group.user as user, 
    SIZE(DistinctBy(data_sessionized)) as distinctIpCount; 

    data_sessionized = FILTER data_sessionized BY distinctIpCount > 1; 

    data_sessionized = FOREACH data_sessionized GENERATE user; 

    data_sessionized = DISTINCT data_sessionized; 

    STORE data_sessionized INTO 'output'; 
    */ 
    @Multiline private String sessionizeUserIpTest; 

    private String[] sessionizeUserIpTestData = new String[] { 
     "Harold,2014-02-18T16:14:49.503Z,123.123.123.123", 
     "Harold,2014-02-18T16:14:51.503Z,123.123.123.123", 
     "Harold,2014-02-18T16:14:55.503Z,321.321.321.321", 
     "Kumar,2014-02-18T16:14:49.503Z,123.123.123.123", 
     "Kumar,2014-02-18T16:14:55.503Z,123.123.123.123", 
     "Kumar,2014-02-18T16:15:05.503Z,321.321.321.321" 
    }; 

    @Test 
    public void sessionizeUserIpTest() throws Exception 
    { 
    PigTest test = createPigTestFromString(sessionizeUserIpTest); 

    this.writeLinesToFile("input", 
     sessionizeUserIpTestData); 

    List<Tuple> result = this.getLinesForAlias(test, "data_sessionized"); 

    assertEquals(result.size(),1); 
    assertEquals(result.get(0).get(0),"Harold"); 
    } 
+0

嗨matterhayes, 你可以請看看我的錯誤消息造成的Sessionize?謝謝! – suerte

+0

當然,您需要爲第一個字段創建時間戳。我會用這個註釋來更新這個答案。看看我在其他評論中描述的FOREACH。 – matterhayes