2013-09-24 98 views
0

我還沒有找到關於在包中使用udfs的太多教程。pig - 如何使用python udf從數據集計算速度

比方說,我有以下數據集:

UID : distance_from_something : timestamp 
100:100:0 
100:101:1 
100:102:2 
200:200:0 
200:202:3 
200:204:6 
300:300:0 
300:303:5 

現在我想計算速度爲每一個UID

data = LOAD 'testfile' USING PigStorage(':') AS (
    uid:long, 
    distance:int, 
    time_raw:long); 

SPLIT data INTO 
    good_data IF (
     (uid > 0L)), 
    bad_data OTHERWISE; 

REGISTER '$UDFPATH//calculateVelocity.py' USING jython AS vcalc; 

grouped_data = GROUP good_data BY (long)$0; 
data = FOREACH grouped_data GENERATE vcalc.calculate(good_data); 
flat_data = FOREACH data GENERATE FLATTEN($0); 

這是一個很好的方式,如果換做這樣的事情,例如,我希望輸出看起來像這樣:

100:100:0:1 
100:101:1:1 
100:102:2:1 
200:200:0:0.666... 
200:202:3:0.666... 
200:204:6:0.666... 
300:300:0:0.6 
300:303:5:0.6 

什麼是這種場景中的最佳方式ario使用非線性插值計算速度?

這是我目前的佔位符:

def compared_to_previous(bag, index): 
    dx = float(bag[index][1] - bag[index - 1][1]) 
    dt = float(bag[index][-1] - bag[index - 1][-1])/1000 
    return dx/dt 

def compared_to_next(bag, index): 
    return compared_to_previous(bag, index+1) 

def calculate(inBag): 
    outBag = [] 

    index = 0 
    tuples = len(inBag) 
    for t in inBag: 
     row = list(t) 
     if not index: 
      row.append(compared_to_next(inBag, index)) 
     elif index == tuples - 1: 
      row.append(compared_to_previous(inBag, index)) 
     else: 
      v = compared_to_previous(inBag, index) 
      v += compared_to_next(inBag, index) 
      row.append(v/2) 
     outBag.append(tuple(row)) 

    return outBag 

回答

1

我把它留給你真正實現速度的計算 - 這不是清楚你將如何處理變速爲一體,並實現不是豬的問題。但將這些數據轉化爲UDF非常簡單。

您不想將good_data傳遞給UDF - 它指的是一個關係,而不是一個字段。你需要收集在一起,每個UID的所有記錄,然後將該集合傳遞給知道如何處理他們UDF:

data = 
    FOREACH (GROUP good_data BY uid) 
    GENERATE 
     group, 
     FLATTEN(vcalc.calculate(good_data.(distance, time_raw))); 

輸入到你的UDF的形式是對(距離,time_raw一袋),你的輸出應該是一個三元組的形式(距離,time_raw,velocity)。

+0

你的實現與我原來添加的問題有什麼不同?有沒有實際的差異? – warbaque

+0

它看起來像你基本上實現了我的建議。唯一的區別在於,你在整個UDF中保留了UID,而我立即生成了它,但這並不重要。 –