0
我還沒有找到關於在包中使用udfs的太多教程。pig - 如何使用python udf從數據集計算速度
比方說,我有以下數據集:
UID : distance_from_something : timestamp
100:100:0
100:101:1
100:102:2
200:200:0
200:202:3
200:204:6
300:300:0
300:303:5
現在我想計算速度爲每一個UID
data = LOAD 'testfile' USING PigStorage(':') AS (
uid:long,
distance:int,
time_raw:long);
SPLIT data INTO
good_data IF (
(uid > 0L)),
bad_data OTHERWISE;
REGISTER '$UDFPATH//calculateVelocity.py' USING jython AS vcalc;
grouped_data = GROUP good_data BY (long)$0;
data = FOREACH grouped_data GENERATE vcalc.calculate(good_data);
flat_data = FOREACH data GENERATE FLATTEN($0);
這是一個很好的方式,如果換做這樣的事情,例如,我希望輸出看起來像這樣:
100:100:0:1
100:101:1:1
100:102:2:1
200:200:0:0.666...
200:202:3:0.666...
200:204:6:0.666...
300:300:0:0.6
300:303:5:0.6
什麼是這種場景中的最佳方式ario使用非線性插值計算速度?
這是我目前的佔位符:
def compared_to_previous(bag, index):
dx = float(bag[index][1] - bag[index - 1][1])
dt = float(bag[index][-1] - bag[index - 1][-1])/1000
return dx/dt
def compared_to_next(bag, index):
return compared_to_previous(bag, index+1)
def calculate(inBag):
outBag = []
index = 0
tuples = len(inBag)
for t in inBag:
row = list(t)
if not index:
row.append(compared_to_next(inBag, index))
elif index == tuples - 1:
row.append(compared_to_previous(inBag, index))
else:
v = compared_to_previous(inBag, index)
v += compared_to_next(inBag, index)
row.append(v/2)
outBag.append(tuple(row))
return outBag
你的實現與我原來添加的問題有什麼不同?有沒有實際的差異? – warbaque
它看起來像你基本上實現了我的建議。唯一的區別在於,你在整個UDF中保留了UID,而我立即生成了它,但這並不重要。 –