2017-03-16 36 views
0

我有一個從spooldir上的文件讀取數據的過程過程將數據加載到MySQL數據庫中。將有多種類型的文件可以通過相同的flume流程進行處理。如何重置每個批次Flume的定製接收器類中的變量

我已經創建了一個自定義接收器Java類(擴展AbstractSink),它在初始/第一次讀取之後更新本地變量(sInterfaceType)以決定文件中的數據格式。 一旦文件處理完成,我必須重置它,以便它必須從識別下一批/接口文件開始。

我試着在stop()中做,但它沒有幫助。有人這樣做嗎?

我的接收器類看起來是這樣的:

public class MyFlumeSink2 extends AbstractSink implements Configurable { 

private String sInterfaceType; //tells file format of current load 

public MyFlumeSink2() { 
    //my initialization of variables 
} 

public void configure(Context context) { 
    //read context variables 
} 

public void start() { 
    //create db connection 
} 

@Override 
public void stop() { 
    //destroy connection 
    sInterfaceType = ""; //This doesn't help me 
    super.stop(); 
} 

public Status process() throws EventDeliveryException { 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 

    if((sInterfaceType=="" || sInterfaceType==null)) 
    { 
    //Read first line & set sInterfaceType 
    }else 
    //Insert data in MySQL 

    transaction.commit(); 
} 
} 
+0

如何檢測(在接收器級別)文件是否已完成處理? – frb

+0

我正在考慮在完成一個批處理或文件後調用stop()方法。但看起來情況並非如此。所以我不知道你的問題的答案。 – KiranM

回答

0

我們不得不手動決定它是一個事件,也沒有要求每個新文件專門方法。

我修改了我的代碼來讀取事件線& set InterfaceType根據第一個元素。我的代碼如下所示:

public Status process() throws EventDeliveryException { 
     //....other code... 

      sEvtBody = new String(event.getBody()); 
      sFields = sEvtBody.split(","); 

      //check first field to know record type 
      enumRec = RecordType.valueOf(checkRecordType(sFields[0].toUpperCase())); 
      switch(enumRec) 
      { 
       case CUST_ID: 
        sInterfaceType = "T_CUST"; 
        bHeader = true; 
        break; 
       case TXN_ID: 
        sInterfaceType = "T_CUST_TXNS"; 
        bHeader = true; 
        break; 
       default: 
        bHeader = false; 
      } 
      //insert if not header 
      if(!bHeader) 
      { 

       if(sInterfaceType == "T_CUST") 
       { 
        if(sFields.length == 14) 
         this.bInsertStatus = daoClass.insertHeader(sFields); 
        else 
         throw new Exception("INCORRECT_COLUMN_COUNT"); 
       }else if(sInterfaceType == "T_CUST_TXNS") 
       { 
        if(sFields.length == 10) 
         this.bInsertStatus = daoClass.insertData(sFields); 
        else 
         throw new Exception("INCORRECT_COLUMN_COUNT"); 
       } 

       //if(!bInsertStatus) 
       // logTransaction(sFields); 
      } 
      //....Other code.... 
相關問題