2013-10-01 165 views
0

我是Flume-ng的新手。我必須編寫一個程序,它可以將文本文件傳輸到其他程序(代理程序)。我知道我們必須知道代理,即主機IP,端口號等等。然後應該定義一個信源,一個信道和一個信道。我只想將日誌文件傳輸到服務器。我的客戶代碼如下。 公共類MyRpcClientFacade {Flume:將數據傳輸到服務器

public class MyClient{ 

    private RpcClient client; 
    private String hostname; 
    private int port; 

    public void init(String hostname, int port) { 
     this.hostname = hostname; 
     this.port = port; 
     this.client = RpcClientFactory.getDefaultInstance(hostname, port); 

     } 

     public void sendDataToFlume(String data) { 
     Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); 
     try { 
      client.append(event); 
     } catch (EventDeliveryException e) { 
      client.close(); 
      client = null; 
      client = RpcClientFactory.getDefaultInstance(hostname, port); 
     } 
     } 

     public void cleanUp() { 
     client.close(); 
     } 
} 

上面的代碼只能String數據發送到指定的進程。但我必須發送文件。另外請告訴我,Source,Channel and Sink是否必須寫入服務器?如果是這樣,如何配置和寫這三個。請幫幫我。給小樣本Source,Sink And Channel

回答

0

其實你只需要在每個節點上獲得flume客戶端。然後,您提供一個配置文件,提供有關其行爲的信息。例如,如果您的節點讀取文件(讀取每個新行並將它們作爲事件發送到通道),並通過RPC套接字發送文件內容。您的配置將是這樣的:

# sources/sinks/channels list 
    <Agent>.sources = <Name Source1> 
    <Agent>.sinks = <Name Sink1> 
    <Agent>.channels = <Name Channel1> 
    # Channel attribution to a source 
    <Agent>.sources.<Name Source1>.channels = <Name Channel1> 
    # Channel attribution to sink 
    <Agent>.sinks.<Name Sink1>.channels = <Name Channel1> 
    # Configuration (sources,channels and sinks) 
    # Source properties : <Name Source1> 
    <Agent>.sources.<Name Source1>.type = exec 
    <Agent>.sources.<Name Source1>.command = tail -F test 
    <Agent>.sources.<Name Source1>.channels = <Name Channel1> 
    # Channel properties : <Name Channel1> 
    <Agent>.channels.<Name Channel1>.type = memory 
    <Agent>.channels.<Name Channel1>.capacity = 1000 
    <Agent>.channels.<Name Channel1>.transactionCapacity = 1000 
    # Sink properties : <Name Sink1> 
    <Agent>.sinks.<Nom Sink1>.type = avro 
    <Agent>.sinks.<Nom Sink1>.channel = <Nom Channel1> 
    <Agent>.sinks.<Nom Sink1>.hostname = <HOST NAME or IP> 
    <Agent>.sinks.<Nom Sink1>.port = <PORT NUMBER> 

然後,你將需要設置一個代理,這將讀取同一個端口上的Avro源和處理事件要存儲它們的方式。我希望它有幫助;)

相關問題