2014-11-01 58 views
1

我有一個假脫機目錄,其中存在所有json文件,傳入文件將每秒添加到此目錄中,並且我必須反序列化傳入的json文件並獲取require字段並將它附加到HDFS目錄中。反序列化Json文件並使用槽進入HDFS

我所做的是創建了一個flume conf文件,其中將spooling目錄中的文件作爲源文件,並使用1 Sink將json文件直接放入HDFS。

我必須在接收器之前將此json轉換爲結構格式並將其放置到HDFS中。 最重要的是,它不是一個Twitter數據。我必須純粹實施Flume。

我用下面的水槽配置,以完成這項工作:

agent_slave_1.channels.fileChannel1_1.type = file 
agent_slave_1.channels.fileChannel1_1.capacity = 200000 
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000 
agent_slave_1.sources.source1_1.type = spooldir 

agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/ 
agent_slave_1.sources.source1_1.fileHeader = false 
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED 
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs 
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/ 
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000 
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456 
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0 
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000 
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text 

agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream 
agent_slave_1.sources.source1_1.channels = fileChannel1_1 
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1 

agent_slave_1.sinks = hdfs-sink1_1 
agent_slave_1.sources = source1_1 
agent_slave_1.channels = fileChannel1_1 

但我不知道如何使用解串器。

有人可以幫我一個想法如何反序列化Incomming Json文件?如果我需要在java中編寫任何代碼,請幫助我,我需要使用哪個接口?如果可能的話給一些提示。

+0

你有沒有找到任何答案,請分享。 – 2014-11-06 06:58:21

+0

不,我也在尋找同樣的事情。 – user3782364 2014-11-06 13:27:33

回答

1

最好的猜測是編寫一個自定義攔截器,將您的JSON轉換爲所需的HDFS格式。它還具有填充可在hdfs路徑中使用的標題的優點。

這裏是如何配置的攔截器:

agent_slave_1.sources.source1_1.interceptors = my_intercptor 
agent_slave_1.sources.source1_1.interceptors.my_intercptor.type = com.mycompany.MyInteceptor 

類是這樣的:

public class MyInteceptor implements Interceptor, Interceptor.Builder { 

    private MyInteceptor interceptor; 

    @Override 
    public void initialize() { 


    } 

    @Override 
    public Event intercept(Event event) { 
     String bjson = event.getBody())); 
     // decode your json, e.g. Jackson 
     MyDecodedJsonObject record; // pseudo class 
     event.getHeaders().put("timestamp", record.getTimestamp().toString()); 
     String newBody = record.getA() + "\t" + record.getB(); 
     event.setBody(newBody.getBytes()) 
     return event; 
    } 

    @Override 
    public List<Event> intercept(List<Event> events) { 

     for (Iterator<Event> iterator = events.iterator(); iterator.hasNext();) { 
      Event next = intercept(iterator.next()); 
      if (next == null) { 
       iterator.remove(); 
      } 
     } 
     return events; 
    } 

    @Override 
    public void close() { 


    } 

    @Override 
    public Interceptor build() { 
     return interceptor; 
    } 

    @Override 
    public void configure(Context context) { 

     interceptor = new MyInteceptor(); 
    } 

} 

不要忘記在一個罐子來包裝這個類,並把它放到水槽的lib目錄。