1
我想通過其掩碼IP更改數據中的IP地址。這是在我的Flume代理的「備份」部分完成的(見下文)。Flume自定義攔截器不工作
在此配置有2個通道:所述第一信道的數據轉儲到HBase的,而第二個用於備份:
a1.sources = r1 r2
a1.channels = channel1 Backup_channel
a1.sinks = FSink
a1.sources.r1.handler = com.flume.handler.JSONHandler
a1.sources.r1.type = avro
a1.sources.r1.bind = x.x.x.x
a1.sources.r1.port = 10008
a1.sources.r2.handler = com.flume.handler.JSONHandler
a1.sources.r2.type = avro
a1.sources.r2.bind = x.x.x.x
a1.sources.r2.port = 10009
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type = com.flume.interceptor.DcInterceptor
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /root/flume/channels/Livechannel/checkpoint
a1.channels.channel1.dataDirs = /root/flume/channels/Livechannel/data
a1.sinks.FSink.type = hbase
a1.sinks.FSink.table = Temp_Test
a1.sinks.FSink.batchSize = 300
a1.sinks.FSink.columnFamily = T
a1.sinks.FSink.serializer = com.flume.sink.TestTP
a1.sources.r1.channels = channel1
a1.sources.r2.channels = Backup_channel
a1.channels.Backup_channel.type = file
a1.channels.Backup_channel.checkpointDir = /data/disk/flume/backup/checkpoint
a1.channels.Backup_channel.dataDirs = /data/disk/flume/backup/data
a1.sinks.FSink.channel = channel1
以下是我定製的Java代碼攔截。它實現了攔截方法,從身體獲取IP地址,計算其IP掩碼,然後將其添加到主體。但不知何故,它不工作:
public class DcInterceptor implements Interceptor {
private byte[] jsonTestBeans;
private final Type listType = new TypeToken < List <TestBeans>>() {}.getType();
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void initialize() {
// TODO Auto-generated method stub
new Logger();
}
@Override
public Event intercept(Event event) {
// TODO Auto-generated method stub
List <Row> actions = new ArrayList <Row>();
this.jsonTestBeans = event.getBody();
Logger.logger.debug("In Interceptor");
System.out.println("In Interceptor");
Gson _Gson = new Gson();
String jsonstr = "";
try {
jsonstr = new String(jsonTestBeans, "UTF-8");
} catch (Exception e) {
// TODO: handle exception
Logger.logger.error(e.getMessage() + "In Interceptor");
jsonstr = new String(jsonTestBeans);
}
List <TestBeans> TestBeanss = _Gson.fromJson(jsonstr, listType);
System.out.println("Json String :" + jsonstr);
List <String> gTouch = new ArrayList <String>();
for (TestBeans TestBeans: TestBeanss) {
String str = TestBeans.getIp();
Logger.logger.debug("IP : " + str);
String st = (str.substring(0, str.lastIndexOf(".") + 1) + "x");
Logger.logger.debug("Mask IP : " + st);
TestBeans.setRemoteIp(st);
}
event.setBody(_Gson.toJson(TestBeanss).getBytes());
Logger.logger.debug("Interceptor Ends");
return event;
}
@Override
public List <Event> intercept(List <Event> events) {
// TODO Auto-generated method stub
System.out.println("In List Interceptor");
Logger.logger.debug("In List Interceptor");
for (Event event: events) {
intercept(event);
}
return events;
}
public static class CounterInterceptorBuilder implements Interceptor.Builder {
private Context ctx;
@Override
public Interceptor build() {
Logger.logger.debug("In Interceptor Build");
System.out.println("In Build Interceptor");
return new DcInterceptor();
}
@Override
public void configure(Context context) {
this.ctx = context;
}
}
這是一個打字錯誤,現在它是完美的。可以üplz再次檢查#frb謝謝 –
很好。所以,嘗試添加到攔截器的'package com.flume.interceptor;'到文件的開頭。 – frb
如果仍然失敗,請將相關日誌添加到您的問題。 – frb