雖然你可以寫一個UDF對於這一點,你實際上可以利用的UDF在Apache DataFu已經可以解決這個問題。
我的解決方案涉及將會話化應用於數據。基本上你會看到連續的事件併爲每個事件分配一個會話ID。如果兩個事件之間的時間超過了指定的時間(在您的情況下爲5秒),則下一個事件將獲得新的會話ID。否則,連續的事件會獲得相同的會話ID。一旦爲每個事件分配了其會話ID,其餘都很容易。我們按會話ID進行分組,並查找具有多個不同IP地址的會話。
我會穿過我的解決方案。
假設您有以下輸入數據。哈羅德和庫馬爾都改變了他們的IP地址。但哈羅德在5秒鐘內完成,而庫馬爾則沒有。所以我們腳本的輸出應該只是簡單的「哈羅德」。
Harold,2014-02-18T16:14:49.503Z,123.123.123.123
Harold,2014-02-18T16:14:51.503Z,123.123.123.123
Harold,2014-02-18T16:14:55.503Z,321.321.321.321
Kumar,2014-02-18T16:14:49.503Z,123.123.123.123
Kumar,2014-02-18T16:14:55.503Z,123.123.123.123
Kumar,2014-02-18T16:15:05.503Z,321.321.321.321
加載數據
data = LOAD 'input' using PigStorage(',')
AS (user:chararray,time:chararray,ip:chararray);
現在從DataFu定義一對夫婦的UDF。如前所述,UDF執行會話。 DistinctBy UDF將用於在每個會話中查找不同的IP地址。
define Sessionize datafu.pig.sessions.Sessionize('5s');
define DistinctBy datafu.pig.bags.DistinctBy('1');
按用戶分組數據,按時間排序並應用Sessonize UDF。請注意,時間戳必須是第一個字段,因爲這是Sessionize期望的。此UDF將會話ID附加到每個元組。
data = FOREACH data GENERATE time,user,ip;
data_sessionized = FOREACH (GROUP data BY user) {
views = ORDER data BY time;
GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id);
}
既然數據已被會話化,我們可以按用戶和會話進行分組。我也被用戶分組,因爲我想把這個值吐出來。我們將這些事件傳遞給DistinctBy UDF。查看此UDF的文檔以獲取更詳細的說明。但基本上,我們將獲得儘可能多的元組,因爲每個會話有不同的IP地址。請注意,我已經從下面的關係中刪除了時間。這是因爲1)它不是必需的,2)DataFu 1.2.0中的DistinctBy在處理包含短劃線的字段時存在一個錯誤,就像時間字段一樣。
data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id;
data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE
group.user as user,
SIZE(DistinctBy(data_sessionized)) as distinctIpCount;
現在選擇所有具有多個不同IP地址的會話,並返回這些會話的不同用戶。
data_sessionized = FILTER data_sessionized BY distinctIpCount > 1;
data_sessionized = FOREACH data_sessionized GENERATE user;
data_sessionized = DISTINCT data_sessionized;
這只是生產:
Harold
以下是完整的源代碼,您應該能夠直接粘貼到DataFu單元測試和運行:
/**
define Sessionize datafu.pig.sessions.Sessionize('5s');
define DistinctBy datafu.pig.bags.DistinctBy('1'); -- distinct by ip
data = LOAD 'input' using PigStorage(',') AS (user:chararray,time:chararray,ip:chararray);
data = FOREACH data GENERATE time,user,ip;
data_sessionized = FOREACH (GROUP data BY user) {
views = ORDER data BY time;
GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id);
}
data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id;
data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE
group.user as user,
SIZE(DistinctBy(data_sessionized)) as distinctIpCount;
data_sessionized = FILTER data_sessionized BY distinctIpCount > 1;
data_sessionized = FOREACH data_sessionized GENERATE user;
data_sessionized = DISTINCT data_sessionized;
STORE data_sessionized INTO 'output';
*/
@Multiline private String sessionizeUserIpTest;
private String[] sessionizeUserIpTestData = new String[] {
"Harold,2014-02-18T16:14:49.503Z,123.123.123.123",
"Harold,2014-02-18T16:14:51.503Z,123.123.123.123",
"Harold,2014-02-18T16:14:55.503Z,321.321.321.321",
"Kumar,2014-02-18T16:14:49.503Z,123.123.123.123",
"Kumar,2014-02-18T16:14:55.503Z,123.123.123.123",
"Kumar,2014-02-18T16:15:05.503Z,321.321.321.321"
};
@Test
public void sessionizeUserIpTest() throws Exception
{
PigTest test = createPigTestFromString(sessionizeUserIpTest);
this.writeLinesToFile("input",
sessionizeUserIpTestData);
List<Tuple> result = this.getLinesForAlias(test, "data_sessionized");
assertEquals(result.size(),1);
assertEquals(result.get(0).get(0),"Harold");
}
你需要編寫一個UDF。 –
你可以看看使用esper – robthewolf