2016-03-09 49 views
0

我有以下Flume配置文件,用於抓取具有特定數值的服務器日誌條目,並將它們推送到相應的 kafka主題。Apache Flume。帶複用通道選擇器的正則表達式提取器

# Name the components on this agent 
a1.sources = r1 
a1.channels = c2 c3 

# Describe/configure the source 
a1.sources.r1.type = spooldir 
a1.sources.r1.spoolDir = /home/user/spoolFlume 
a1.sources.r1.fileSuffix = .DONE 
a1.sources.r1.basenameHeader = true 
a1.sources.r1.deserializer.maxLineLength = 8192 

a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = regex_extractor 
a1.sources.r1.interceptors.i1.regex = (2725391) 
a1.sources.r1.interceptors.i1.serializers = id 
a1.sources.r1.interceptors.i1.serializers.id.name = project_id 
a1.sources.r1.selector.type = multiplexing 
a1.sources.r1.selector.header = project_id 
a1.sources.r1.selector.mapping.2725391 = c3 
a1.sources.r1.selector.default = c2 


a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel 
a1.channels.c2.brokerList=kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092 
a1.channels.c2.topic = flume_test_002 
a1.channels.c2.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181 
#default = true 
a1.channels.c2.parseAsFlumeEvent = true 

a1.channels.c3.type = org.apache.flume.channel.kafka.KafkaChannel          
a1.channels.c3.brokerList = kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092 
a1.channels.c3.topic = flume_test_003 
a1.channels.c3.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181 
a1.channels.c3.parseAsFlumeEvent = true 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c2 c3 

我做了一些測試更復雜的正則表達式,所有看起來不錯與cat | grep -E <regexp>,但是當我試圖用它在水槽配置並不是所有的條目被捕獲。

現在我用一個字正則表達式,但即使不是所有的條目被捕獲,即不是所有的「正確」的條目到卡夫卡的話題(比如我有兩個字符串以「2725391」的日誌,但加工後我可以只看到卡夫卡中的一個條目)。

Flume config似乎有些問題。任何建議將非常感激。

更新2.甚至更多 - 當我使用短文件(少於100個字符串)解析所有的作品好。與約2GB的文件,我錯過了條目。

更新3.我找到了解析所有條目的方法。

a1.sources.r1.decodeErrorPolicy = IGNORE 

它有幫助,因爲卡夫卡頻道中解析事件的標題中有一個奇怪的符號。我不知道它從何而來,因爲有原木內部沒有這樣的符號處理前:/

basename00278388pid2725391�31.28.244.74 

回答

0

解決的辦法是設置JAVA_HOME 適當值和設定以下設置:

a1.sources.r1.decodeErrorPolicy = IGNORE 

問題的根源在日誌中的某個地方使用非UTF字符。