2013-10-10 14 views
2

我正在測試flume將數據加載到hHase中,並考慮使用flume的選擇器和inteceptor進行並行數據加載,因爲源和匯之間的速度差距。如何在槽中一起使用regex_extractor選擇器和多路複用攔截器?

所以,我想與水槽做的

  1. 創建活動的頭與攔截器的regex_extractor型
  2. 與頭多於兩個通道多路複用器的事件

    在一個

    選擇的複用型源溝道水槽。

並嘗試配置如下。

 

    agent.sources = tailsrc 
    agent.channels = mem1 mem2 
    agent.sinks = std1 std2 
    agent.sources.tailsrc.type = exec 
    agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt 
    agent.sources.tailsrc.batchSize = 1 

    agent.sources.tailsrc.interceptors = i1 
    agent.sources.tailsrc.interceptors.i1.type = regex_extractor 
    agent.sources.tailsrc.interceptors.i1.regex = ^(\\d) 
    agent.sources.tailsrc.interceptors.i1.serializers = t1 
    agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type 

    agent.sources.tailsrc.selector.type = multiplexing 
    agent.sources.tailsrc.selector.header = type 
    agent.sources.tailsrc.selector.mapping.1 = mem1 
    agent.sources.tailsrc.selector.mapping.2 = mem2 

    agent.sinks.std1.type = file_roll 
    agent.sinks.std1.channel = mem1 
    agent.sinks.std1.batchSize = 1 
    agent.sinks.std1.sink.directory = /var/log/flumeout/1 
    agent.sinks.std1.rollInterval = 0 

    agent.sinks.std2.type = file_roll 
    agent.sinks.std2.channel = mem2 
    agent.sinks.std2.batchSize = 1 
    agent.sinks.std2.sink.directory = /var/log/flumeout/2 
    agent.sinks.std2.rollInterval = 0 

    agent.channels.mem1.type = memory 
    agent.channels.mem1.capacity = 100 

    agent.channels.mem2.type = memory 
    agent.channels.mem2.capacity = 100 

但是,它不起作用!

當選擇器部件被移除時,flume的日誌中會有一些攔截器調試消息。 但是當選擇器和攔截器在一起時,沒有任何東西。

有沒有錯誤的表達或我錯過了什麼?

感謝您的閱讀。 :)

回答

1

我找到了。

在水槽日誌中,有如下警告信息。

 

    2013-10-10 16:34:20,514 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed tailsrc due to Failed to configure component! 

所以我已經連接了以下線

 

    agent.sources.tailsrc.channels = mem1 mem2 

,然後它的工作原理!!!!

+0

這是綁定源和chanel。還需要通過'agent.sinks.std1 = mem1'和'agent.sinks.std2 = mem2'來綁定sink和chanel –