2013-02-12 43 views
5

我是新來的Apache水槽。
我想看看我可以得到一個json(作爲http源),解析它並根據內容將其存儲到hdfs上的動態路徑。
例如:
如果JSON是:apache flume hdfs可以接受動態路徑寫入嗎?

[{ 
    "field1" : "value1", 
    "field2" : "value2" 
}] 

則HDFS路徑將是:
/一些默認根路徑/ VALUE1/VALUE2 /一些值名稱管理文件
是否有這樣的水槽配置使我能夠做到這一點?

這裏是我當前的配置(通過HTTP接受JSON,並將其存儲在一個路徑根據時間戳):

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

謝謝!

回答

8

的解決方案是在水槽documentation for the hdfs sink

這裏是修改後的配置:

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%{field1} 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

和捲曲:

curl -X POST -d '[{ "headers" : {   "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"   }, "body" : "random_body" }]' localhost:9000 
+0

我使用不同的源(RabbitMQ的)和I我自己傳遞了一個JSON負載。你描述的方法似乎不適用於我的情況。我假設,除非你遇到類似的問題,否則我的一切都是錯誤的 – 2013-06-01 17:51:55

+0

謝謝,這對我有效 – 2013-06-01 19:35:13