2017-03-14 133 views
1

我想通過其掩碼IP更改數據中的IP地址。這是在我的Flume代理的「備份」部分完成的(見下文)。Flume自定義攔截器不工作

在此配置有2個通道:所述第一信道的數據轉儲到HBase的,而第二個用於備份:

a1.sources = r1 r2 
a1.channels = channel1 Backup_channel 
a1.sinks = FSink 

a1.sources.r1.handler = com.flume.handler.JSONHandler 
a1.sources.r1.type = avro 
a1.sources.r1.bind = x.x.x.x 
a1.sources.r1.port = 10008 

a1.sources.r2.handler = com.flume.handler.JSONHandler 
a1.sources.r2.type = avro 
a1.sources.r2.bind = x.x.x.x 
a1.sources.r2.port = 10009 
a1.sources.r2.interceptors = i1 
a1.sources.r2.interceptors.i1.type = com.flume.interceptor.DcInterceptor 

a1.channels.channel1.type = file 
a1.channels.channel1.checkpointDir = /root/flume/channels/Livechannel/checkpoint 
a1.channels.channel1.dataDirs = /root/flume/channels/Livechannel/data 

a1.sinks.FSink.type = hbase 
a1.sinks.FSink.table = Temp_Test 
a1.sinks.FSink.batchSize = 300 
a1.sinks.FSink.columnFamily = T 
a1.sinks.FSink.serializer = com.flume.sink.TestTP 

a1.sources.r1.channels = channel1 
a1.sources.r2.channels = Backup_channel 

a1.channels.Backup_channel.type = file 
a1.channels.Backup_channel.checkpointDir = /data/disk/flume/backup/checkpoint 
a1.channels.Backup_channel.dataDirs = /data/disk/flume/backup/data 

a1.sinks.FSink.channel = channel1 

以下是我定製的Java代碼攔截。它實現了攔截方法,從身體獲取IP地址,計算其IP掩碼,然後將其添加到主體。但不知何故,它不工作:

public class DcInterceptor implements Interceptor { 
    private byte[] jsonTestBeans; 

    private final Type listType = new TypeToken < List <TestBeans>>() {}.getType(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void initialize() { 
     // TODO Auto-generated method stub 
     new Logger(); 
    } 

    @Override 
    public Event intercept(Event event) { 
     // TODO Auto-generated method stub 
     List <Row> actions = new ArrayList <Row>(); 
     this.jsonTestBeans = event.getBody(); 
     Logger.logger.debug("In Interceptor"); 
     System.out.println("In Interceptor"); 
     Gson _Gson = new Gson(); 
     String jsonstr = ""; 
     try { 
      jsonstr = new String(jsonTestBeans, "UTF-8"); 
     } catch (Exception e) { 
      // TODO: handle exception 
      Logger.logger.error(e.getMessage() + "In Interceptor"); 
      jsonstr = new String(jsonTestBeans); 
     } 
     List <TestBeans> TestBeanss = _Gson.fromJson(jsonstr, listType); 
     System.out.println("Json String :" + jsonstr); 
     List <String> gTouch = new ArrayList <String>(); 
     for (TestBeans TestBeans: TestBeanss) { 
      String str = TestBeans.getIp(); 
      Logger.logger.debug("IP : " + str); 
      String st = (str.substring(0, str.lastIndexOf(".") + 1) + "x"); 
      Logger.logger.debug("Mask IP : " + st); 
      TestBeans.setRemoteIp(st); 
     } 
     event.setBody(_Gson.toJson(TestBeanss).getBytes()); 
     Logger.logger.debug("Interceptor Ends"); 
     return event; 
    } 

    @Override 
    public List <Event> intercept(List <Event> events) { 
     // TODO Auto-generated method stub 
     System.out.println("In List Interceptor"); 
     Logger.logger.debug("In List Interceptor"); 
     for (Event event: events) { 
      intercept(event); 
     } 
     return events; 
    } 

    public static class CounterInterceptorBuilder implements Interceptor.Builder { 

     private Context ctx; 

     @Override 
     public Interceptor build() { 
      Logger.logger.debug("In Interceptor Build"); 
      System.out.println("In Build Interceptor"); 
      return new DcInterceptor(); 
     } 

     @Override 
     public void configure(Context context) { 
      this.ctx = context; 
     } 

    } 

回答

0

至少,我可以看到:

  • 關於你的攔截器的配置線指的經紀人打電話ECircleTp_Test,而其餘配置是指a1
  • 您已配置com.flume.interceptor.DcInterceptor2,但您開發的攔截器類名爲DcInterceptor(沒有最終的2)。
  • 您已將com.flume.interceptor.DcInterceptor2配置爲自定義攔截器的完全限定類名稱。儘管如此,攔截器的代碼並沒有爲DcInterceptor(2)類聲明任何包。
+0

這是一個打字錯誤,現在它是完美的。可以üplz再次檢查#frb謝謝 –

+0

很好。所以,嘗試添加到攔截器的'package com.flume.interceptor;'到文件的開頭。 – frb

+0

如果仍然失敗,請將相關日誌添加到您的問題。 – frb