2014-09-10 50 views
0

我需要在Pig中處理一個數據集,它每天午夜可用。因此,我有一位Oozie協調員負責日程安排,並在每天00:00生成一個工作流程。 文件名遵循URI方案如何在Pig中轉換參數?

hdfs://${dataRoot}/input/raw${YEAR}${MONTH}${DAY}${HOUR}.avro 

其中$ {HOUR}始終是 '00'。

數據集中的每個條目都包含一個UNIX時間戳,我希望在11:45 pm(23:45)之前過濾掉那些有時間戳的條目。由於我需要在過去的數據集上運行,因此需要根據當前處理的日期動態設置定義閾值的時間戳值。例如,從2013年12月12日開始計算數據集需要閾值1418337900.因此,設置閾值必須由協調員完成。

據我所知,不可能將格式化日期轉換爲EL中的UNIX時間戳。我想出了一個相當不好的解決方案: 協調器將閾值的日期和時間傳遞給啓動Pig腳本的參數化實例的相應工作流。

摘錄coordinator.xml的:

<property> 
    <name>threshold</name> 
    <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -15, 'MINUTE'), 'yyyyMMddHHmm')}</value> 
</property> 

摘錄workflow.xml的:

<action name="foo"> 
    <pig> 
     <job-tracker>${jobTracker}</job-tracker> 
     <name-node>${nameNode}</name-node> 
     <script>${applicationPath}/flights.pig</script> 
     <param>jobInput=${jobInput}</param> 
     <param>jobOutput=${jobOutput}</param> 
     <param>threshold=${threshold}</param> 
    </pig> 
    <ok to="end"/> 
    <error to="error"/> 
</action> 

豬八戒腳本需要這種格式化的日期時間轉換成UNIX時間戳。爲此,我有一個軟件寫UDF:

public class UnixTime extends EvalFunc<Long> { 

    private long myTimestamp = 0L; 

    private static long convertDateTime(String dt, String format) 
      throws IOException { 
     DateFormat formatter; 
     Date date = null; 
     formatter = new SimpleDateFormat(format); 
     try { 
      date = formatter.parse(dt); 
     } catch (ParseException ex) { 
      throw new IOException("Illegal Date: " + dt + " format: " + format); 
     } 
     return date.getTime()/1000L; 
    } 

    public UnixTime(String dt, String format) throws IOException { 
     myTimestamp = convertDateTime(dt, format); 
    } 

    @Override 
    public Long exec(Tuple input) throws IOException { 
     return myTimestamp; 
    } 

} 

在豬腳本,宏創建,與協調員/工作流的輸入初始化UDF。然後,您可以過濾時間戳。

DEFINE THRESH mystuff.pig.UnixTime('$threshold', 'yyyyMMddHHmm'); 
d = LOAD '$jobInput' USING PigStorage(',') AS (time: long, value: chararray); 
f = FILTER d BY d <= THRESH(); 
... 

,我有使我更一般的問題,如果有可能在豬變換輸入參數,並再次使用它作爲某種恆定的問題。 有沒有更好的方法來解決這個問題,還是我的方法不必要的複雜?

編輯:TL; DR

更多的搜索後,我發現有人用同樣的問題: http://grokbase.com/t/pig/user/125gszzxnx/survey-where-are-all-the-udfs-and-macros

感謝拉夫在撲滿推薦的UDF的。 似乎沒有使用聲明和shell腳本沒有高性能解決方案。

+0

您是否嘗試過使用UDF CustomFormatToISO(http://pig.apache.org/docs/r0.11.0/api/org/apache/pig/piggybank/evaluation/datetime/convert/CustomFormatToISO.html)至將給定日期(以任何格式)轉換爲日期對象,然後使用ISOToUnix(http://pig.apache.org/docs/r0.11.0/api/org/apache/pig/piggybank/evaluation/datetime/convert/ISOToUnix .html)將其轉換爲時代? – 2014-09-10 17:12:40

回答

0

您可以將Pig腳本放入Python腳本並傳遞值。

#!/usr/bin/python 
import sys 
import time 

from org.apache.pig.scripting import Pig 



P = Pig.compile("""d = LOAD '$jobInput' USING PigStorage(',') AS (time: long, value: chararray); 
f = FILTER d BY d <= '$thresh'; 
""") 

jobinput = {whatever you defined} 
thresh = {whatever you defined in the UDF} 
Q = P.bind({'thresh':thresh,'jobinput':jobinput}) 
results = Q.runSingle() 
if results.isSuccessful() == "FAILED": 
     raise "Pig job failed"