2013-03-11 24 views
4

我正在嘗試創建Ooize協調器。問題是我已經有數據等待使用oozie進行處理。Oozie協調員。如何將過去的數據提供給mapreduce工作?

想象一下這樣的情況。

  1. 當前日期是:2013年3月1日(2013年3月1日)

  2. 我確實有這些輸入目錄:

    /分期/着陸/源/ xvlr/2013/02/01/00(2013年2月第一個月,每天的第一個小時) /staging/landing/source/xvlr/2013/02/01/01

    /staging/landing/source/xvlr/2013/02/01/02

    /分段/降落/源極/ xvlr/2013/02/01/03

    /分段/降落/源極/ xvlr/2013/02/1月4日

    ....

    /分期/降落/源極/ xvlr/2013/02/28/00

    ...

    /分段/降落/源極/ xvlr/2013/02 /23分之28

我希望我的Oozie的協調,以消耗所有先前創建的登陸數據 和產生這樣的輸出:

/masterdata/source/xvlr/2013/02/01/00 
/masterdata/source/xvlr/2013/02/01/01 
/masterdata/source/xvlr/2013/02/01/02 
/masterdata/source/xvlr/2013/02/01/03 
/masterdata/source/xvlr/2013/02/01/04 
.... 
/masterdata/source/xvlr/2013/02/28/00 
... 
/masterdata/source/xvlr/2013/02/28/23 

然後我想我的協調運行每個小時,產生新的輸出masterdata。

如何使用協調器規範來實現? 這是我的協調員。它什麼也沒做。它確實達到我需要的時間,然後等待。它沒有開始工作。

請指教。

<coordinator-app name="Xvlr-parser-coordinator" frequency="60" 
       start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3"> 
    <controls> 
     <timeout>5</timeout> 
     <concurrency>4</concurrency> 
    </controls> 

    <datasets> 
     <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 
     <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 

    </datasets> 

    <input-events> 
     <data-in name="xvlrInputEvent" dataset="xvlrInputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-in> 
    </input-events> 

    <output-events> 
     <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-out> 
    </output-events> 
    <action> 
     <workflow> 
      <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path> 
      <configuration> 
       <property> 
        <name>inputDir</name> 
        <value>${coord:dataIn('xvlrInputEvent')}</value> 
       </property> 
       <property> 
        <name>outputDir</name> 
        <value>${coord:dataOut('xvlrOutputEvent')}</value> 
       </property> 

      </configuration> 

     </workflow> 
    </action> 
</coordinator-app> 

回答

4

這裏是正確的解決方案(它的工作好幾天了:)))):

<coordinator-app name="Xvlr-parser-coordinator" frequency="60" 
       start="2013-03-07T16:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3"> 
    <controls> 
     <timeout>3</timeout> 
     <concurrency>1</concurrency> 
    </controls> 

    <datasets> 
     <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 
     <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 

    </datasets> 

    <input-events> 
     <data-in name="xvlrInputEvent" dataset="xvlrInputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-in> 
    </input-events> 

    <output-events> 
     <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-out> 
    </output-events> 
    <action> 
     <workflow> 
      <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path> 
      <configuration> 
       <property> 
        <name>inputDir</name> 
        <value>${coord:dataIn('xvlrInputEvent')}</value> 
       </property> 
       <property> 
        <name>outputDir</name> 
        <value>${coord:dataOut('xvlrOutputEvent')}</value> 
       </property> 
      </configuration> 

     </workflow> 
    </action> 
</coordinator-app> 

它有什麼作用?

  • 起初它並開始2013-03-07T16:35Z,所以所有以前
    收集的數據已通過底層工作流傳遞(一個Mr工作 調用與分析功能)
    • 在處理「過去時間數據集」(數據集時間小於當前時間)時,工作流逐一運行:它確實消耗了 /pastdate/hour_00,然後立即開始消耗 /pastdate/hour_01等
    • 當協調員到達現在的時間,它開始每小時調用一次工作流程(按照設計:05:35,06:35 ... 23:35)。
    • 查看超時聲明:我確實有缺失的數據集:例如3月第一個10小時沒有數據。 工作流確實等了3分鐘,然後死亡。

問題已解決。