2013-02-01 50 views
7

我有一個Hive表,用於保存客戶調用的數據。 爲簡單起見,考慮它有2列,第一列保存客戶ID,第二列保存呼叫的時間戳(unix timestamp)。計算Hadoop中的連續記錄與Hive查詢之間的差異

我可以查詢此表爲每個客戶找到所有的呼叫:

SELECT * FROM mytable SORT BY customer_id, call_time; 

結果是:

Customer1 timestamp11 
Customer1 timestamp12 
Customer1 timestamp13 
Customer2 timestamp21 
Customer3 timestamp31 
Customer3 timestamp32 
... 

是否有可能創建一個配置單元查詢,返回,爲每一個客戶,從第二次呼叫開始,兩次連續呼叫之間的時間間隔? 對於上面的例子,查詢應該返回:

Customer1 timestamp12-timestamp11 
Customer1 timestamp13-timestamp12 
Customer3 timestamp32-timestamp31 
... 

我試圖解決方案從sql solution適應,但我堅持用蜂巢限制:it accepts subqueries only in FROMjoins must contain only equalities

謝謝。

EDIT1:

我曾嘗試使用一個蜂巢UDF功能:

public class DeltaComputerUDF extends UDF { 
private String previousCustomerId; 
private long previousCallTime; 

public String evaluate(String customerId, LongWritable callTime) { 
    long callTimeValue = callTime.get(); 
    String timeDifference = null; 

    if (customerId.equals(previousCustomerId)) { 
     timeDifference = new Long(callTimeValue - previousCallTime).toString(); 
    } 

    previousCustomerId = customerId; 
    previousCallTime = callTimeValue; 

    return timeDifference; 
}} 

,並命名爲 「三角洲」 使用它。

但似乎(從日誌和結果),它正在MAP時間使用。從這個出現2個問題:

第一:表數據必須由客戶ID和時間戳使用此功能之前進行排序。查詢:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time; 

,因爲分開部在執行時間縮短,正在使用我的功能不久後不起作用。

我可以在使用函數之前對錶格數據進行排序,但是我對此並不滿意,因爲它是我希望避免的開銷。

第二:在分佈式Hadoop配置的情況下,數據在可用的作業跟蹤器之間分裂。所以我相信這個函數會有多個實例,每個映射器都有一個實例,所以可以在兩個映射器之間分割相同的客戶數據。在這種情況下,我將失去客戶電話,這是不可接受的。

我不知道如何解決這個問題。我知道DISTRIBUTE BY確保將具有特定值的所有數據發送到同一個reducer(從而確保SORT按預期工作),是否有人知道映射器是否有類似的東西?

接下來我打算遵循libjack的建議來使用reduce腳本。這個「計算」在一些其他蜂巢查詢之間是需要的,所以我想在嘗試Hive提供的所有內容之前,按照Balaswamy vaddeman的建議,轉向另一種工具。

EDIT2:

我開始調查了自定義腳本解決方案。但是,在編程蜂巢書(本章介紹的自定義腳本)第14章的第一頁,我發現了以下一段話:

流通常比編碼可比的UDF或 InputFormat對象的效率較低。對數據進行序列化和反序列化以將其傳入管道並將其傳遞出管道的效率相對較低。以統一的方式調試整個 程序也很困難。但是,它對於快速原型 以及利用未用Java編寫的現有代碼非常有用。對於不想編寫Java代碼的Hive 用戶,它可以是非常有效的 方法。

因此很明顯,定製腳本不是效率方面的最佳解決方案。

但是,我應該如何保持自己的UDF功能,但要確保它在分佈式Hadoop配置中按預期工作?我在Language Manual UDF wiki頁面的UDF內部部分找到了這個問題的答案。如果我寫我的查詢:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

它在減少時間和BY和SORT DISTRIBUTE BY結構中調用的順序保證所有來自同一客戶的記錄被以相同的減速處理,執行。

所以上面的UDF和這個查詢構造解決了我的問題。

(對不起,不添加鏈接,但我不能這樣做,因爲我沒有足夠的信譽分)

+0

我認爲這與[此問題]非常相似(http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-of-rows )我回答了在蜂巢中使用自定義地圖/減少。你只需要提供適當的reduce腳本。 – libjack

+0

我不知道如何在配置單元中執行此操作,但是有級聯api來執行此操作。在cascading中有一些稱爲緩衝區的東西.http://docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –

回答

11

這是一個老問題,但是爲了將來的參考,我在這裏寫另一個命題:

蜂巢Windowing functions允許查詢中使用一個/下一個值。

甲simili碼查詢可以是:

SELECT CUSTOMER_ID,LAG(call_time,1,0)OVER(PARTITION BY CUSTOMER_ID ORDER BY call_time ROWS 1之前) - call_time FROM MYTABLE;

0

也許有人遇到類似的要求,我找到了解決辦法如下:

1)創建自定義功能:

package com.example; 
// imports (they depend on the hive version) 
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) " 
    + "- computes the time passed between two succesive records from the same customer. " 
    + "It generates 3 columns: first contains the customer id, second contains call time " 
    + "and third contains the time passed from the previous call. This function returns only " 
    + "the records that have a previous call from the same customer (requirements are not applicable " 
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" 
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " 
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") 
public class DeltaComputerUDTF extends GenericUDTF { 
private static final int NUM_COLS = 3; 

private Text[] retCols; // array of returned column values 
private ObjectInspector[] inputOIs; // input ObjectInspectors 
private String prevCustomerId; 
private Long prevCallTime; 

@Override 
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { 
    if (ois.length != 2) { 
     throw new UDFArgumentException(
       "There must be 2 arguments: customer Id column name and call time column name"); 
    } 

    inputOIs = ois; 

    // construct the output column data holders 
    retCols = new Text[NUM_COLS]; 
    for (int i = 0; i < NUM_COLS; ++i) { 
     retCols[i] = new Text(); 
    } 

    // construct output object inspector 
    List<String> fieldNames = new ArrayList<String>(NUM_COLS); 
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); 
    for (int i = 0; i < NUM_COLS; ++i) { 
     // column name can be anything since it will be named by UDTF as clause 
     fieldNames.add("c" + i); 
     // all returned type will be Text 
     fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 
    } 

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); 
} 

@Override 
public void process(Object[] args) throws HiveException { 
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); 
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); 

    if (customerId.equals(prevCustomerId)) { 
     retCols[0].set(customerId); 
     retCols[1].set(callTime.toString()); 
     retCols[2].set(new Long(callTime - prevCallTime).toString()); 
     forward(retCols); 
    } 

    // Store the current customer data, for the next line 
    prevCustomerId = customerId; 
    prevCallTime = callTime; 
} 

@Override 
public void close() throws HiveException { 
    // TODO Auto-generated method stub 

} 

} 

2)創建包含此功能的廣口瓶中。假設jarname是myjar.jar。

3)將jar複製到Hive機器上。假設它被放置在/ TMP

4)定義內部配置單元的自定義函數:

ADD JAR /tmp/myjar.jar; 
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF'; 

5)執行查詢:

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
    (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

備註:

一個。我假定call_time列將數據存儲爲bigint。如果它是字符串,在過程函數中,我們將其作爲字符串檢索(如我們對customerId所做的那樣),然後將其解析爲Long。我決定使用UDTF而不是UDF,因爲這樣它會生成所需的所有數據。否則(使用UDF)生成的數據需要過濾以跳過NULL值。因此,與在原崗位的第一個編輯描述的UDF函數(DeltaComputerUDF),查詢將是:

SELECT customer_id, call_time, time_difference 
FROM 
    (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
     (
     SELECT customer_id, call_time FROM mytable 
     DISTRIBUTE BY customer_id 
     SORT BY customer_id, call_time 
     ) t 
    ) u 
WHERE time_difference IS NOT NULL; 

℃。這兩個函數(UDF和UDTF)工作的需要,不管表內行的順序(所以沒有要求該表的數據由客戶ID進行排序,並使用δ函數之前調用時間)

1

您可以使用明確的MAP-REDUCE與其他編程語言,如Java或Python。 如果從地圖{cutomer_id,call_time}發出並在減速器中,您將獲得{customer_id,list{time_stamp}},並且在縮減器中可以對這些時間戳進行排序並處理數據。

相關問題