我有一個Hive表,用於保存客戶調用的數據。 爲簡單起見,考慮它有2列,第一列保存客戶ID,第二列保存呼叫的時間戳(unix timestamp)。計算Hadoop中的連續記錄與Hive查詢之間的差異
我可以查詢此表爲每個客戶找到所有的呼叫:
SELECT * FROM mytable SORT BY customer_id, call_time;
結果是:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
是否有可能創建一個配置單元查詢,返回,爲每一個客戶,從第二次呼叫開始,兩次連續呼叫之間的時間間隔? 對於上面的例子,查詢應該返回:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
我試圖解決方案從sql solution適應,但我堅持用蜂巢限制:it accepts subqueries only in FROM和joins must contain only equalities。
謝謝。
EDIT1:
我曾嘗試使用一個蜂巢UDF功能:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
,並命名爲 「三角洲」 使用它。
但似乎(從日誌和結果),它正在MAP時間使用。從這個出現2個問題:
第一:表數據必須由客戶ID和時間戳使用此功能之前進行排序。查詢:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
,因爲分開部在執行時間縮短,正在使用我的功能不久後不起作用。
我可以在使用函數之前對錶格數據進行排序,但是我對此並不滿意,因爲它是我希望避免的開銷。
第二:在分佈式Hadoop配置的情況下,數據在可用的作業跟蹤器之間分裂。所以我相信這個函數會有多個實例,每個映射器都有一個實例,所以可以在兩個映射器之間分割相同的客戶數據。在這種情況下,我將失去客戶電話,這是不可接受的。
我不知道如何解決這個問題。我知道DISTRIBUTE BY確保將具有特定值的所有數據發送到同一個reducer(從而確保SORT按預期工作),是否有人知道映射器是否有類似的東西?
接下來我打算遵循libjack的建議來使用reduce腳本。這個「計算」在一些其他蜂巢查詢之間是需要的,所以我想在嘗試Hive提供的所有內容之前,按照Balaswamy vaddeman的建議,轉向另一種工具。
EDIT2:
我開始調查了自定義腳本解決方案。但是,在編程蜂巢書(本章介紹的自定義腳本)第14章的第一頁,我發現了以下一段話:
流通常比編碼可比的UDF或 InputFormat對象的效率較低。對數據進行序列化和反序列化以將其傳入管道並將其傳遞出管道的效率相對較低。以統一的方式調試整個 程序也很困難。但是,它對於快速原型 以及利用未用Java編寫的現有代碼非常有用。對於不想編寫Java代碼的Hive 用戶,它可以是非常有效的 方法。
因此很明顯,定製腳本不是效率方面的最佳解決方案。
但是,我應該如何保持自己的UDF功能,但要確保它在分佈式Hadoop配置中按預期工作?我在Language Manual UDF wiki頁面的UDF內部部分找到了這個問題的答案。如果我寫我的查詢:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
它在減少時間和BY和SORT DISTRIBUTE BY結構中調用的順序保證所有來自同一客戶的記錄被以相同的減速處理,執行。
所以上面的UDF和這個查詢構造解決了我的問題。
(對不起,不添加鏈接,但我不能這樣做,因爲我沒有足夠的信譽分)
我認爲這與[此問題]非常相似(http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-of-rows )我回答了在蜂巢中使用自定義地圖/減少。你只需要提供適當的reduce腳本。 – libjack
我不知道如何在配置單元中執行此操作,但是有級聯api來執行此操作。在cascading中有一些稱爲緩衝區的東西.http://docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –