0
HDFS文件我是比較新的水槽攔截器概念,面向應用在那裏攔截sinked該文件是普通的文本文件之前和應用攔截一切後變成非常糟糕的問題。阿帕奇水槽自定義攔截器 - 二進制和奇怪
下面我攔截代碼 -
package com.flume;
import org.apache.flume.*;
import org.apache.flume.interceptor.*;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class CustomHostInterceptor implements Interceptor {
private String hostValue;
private String hostHeader;
public CustomHostInterceptor(String hostHeader){
this.hostHeader = hostHeader;
}
@Override
public void initialize() {
// At interceptor start up
try {
hostValue =
InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new FlumeException("Cannot get Hostname", e);
}
}
@Override
public Event intercept(Event event) {
// This is the event's body
String body = new String(event.getBody());
if(body.toLowerCase().contains("text")){
try {
event.setBody("hadoop".getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// These are the event's headers
Map<String, String> headers = event.getHeaders();
// Enrich header with hostname
headers.put(hostHeader, hostValue);
// Let the enriched event go
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> interceptedEvents =
new ArrayList<Event>(events.size());
for (Event event : events) {
// Intercept any event
Event interceptedEvent = intercept(event);
interceptedEvents.add(interceptedEvent);
}
return interceptedEvents;
}
@Override
public void close() {
// At interceptor shutdown
}
public static class Builder
implements Interceptor.Builder {
private String hostHeader;
@Override
public void configure(Context context) {
// Retrieve property from flume conf
hostHeader = context.getString("hostHeader");
}
@Override
public Interceptor build() {
return new CustomHostInterceptor(hostHeader);
}
}
}
水槽的conf是 -
agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1
agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/app.log
agent.sources.exec-source.interceptors = i1
agent.sources.exec-source.interceptors.i1.type = com.flume.CustomHostInterceptor$Builder
agent.sources.exec-source.interceptors.i1.hostHeader = hostname
agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost:8020/bosch/flume/applogs
agent.sinks.hdfs-sink.hdfs.filePrefix=logs
agent.sinks.hdfs-sink.hdfs.rollInterval=60
agent.sinks.hdfs-sink.hdfs.rollSize=0
agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000
agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1
對HDFS中創建的文件做一個貓 -
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable���*q�CJv�/ESmP�ź
some textP�żc
some more textP���K
textP��ߌangels and deamonsP��%�
text bla blaP��1�angels and deamonsP��1�
testP��1�hmmmP��1�anything
有什麼建議?
謝謝
確定一件事是肯定的,我要將它標記爲正確的答案,它的工作。不過既然這是我第一次使用攔截器,我想知道它究竟做了什麼。我認爲它會實時處理我的數據,並且實際上會檢查包含「文本」的主體並將其替換爲「hadoop」。這沒有發生,有什麼建議嗎? – AJ84
這是可能的,你可以使用攔截器來轉換和豐富數據。使用此方法來執行該公共事件攔截(事件事件){}給打印輸出語句,並在此處按照您的意願調試和彙總消息。 – RAJESH
如果您有更多問題,您可以發佈單獨的具體問題。這個問題已經回答正確,您可以將其標記爲已回答,因爲您的意見是有效的。 – RAJESH