2013-07-04 43 views
1

我是Flume的新手。我有一個大的CSV文本文件,裏面有記錄,每個文本長度大概有50個字符,CR-LF終止這些行。我想用Flume將這些數據攝入HDFS。結果是文件中只有一行被寫入HDFS(如果有幫助,它就是文件的第二行)。Flume exec to cat file只寫一行到hdfs sink

我在輸出中看不到任何錯誤。謝謝。下面的細節。

這裏是我執行命令:

水槽-NG代理--conf的conf --conf文件example.conf --name A1 -Dflume.root.logger = INFO,控制檯

而且我的配置:

a1.sources = r1 
a1.sinks = k1 k2 
a1.channels = c1 

a1.sources.r1.type = exec 
a1.sources.r1.shell = /bin/bash -c 
a1.sources.r1.command = cat /Users/david/flume/testdata.txt 
a1.sources.r1.interceptors = a 
a1.sources.r1.interceptors.a.type = org.apache.flume.interceptor.TimestampInterceptor$Builder 

# Describe the sinks 
a1.sinks.k1.type = logger 

a1.sinks.k2.type = hdfs 
a1.sinks.k2.channel = c1 
a1.sinks.k2.hdfs.fileType = DataStream 
a1.sinks.k2.hdfs.writeFormat = Text 
# a1.sinks.k2.hdfs.path = /flume 
a1.sinks.k2.hdfs.path = /flume/trades/%y-%m-%d/%H%M/%S 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

和輸出:

13/07/03 22:15:34 INFO lifecycle.LifecycleSupervisor: Starting lifecycle supervisor 1 
13/07/03 22:15:34 INFO node.FlumeNode: Flume node starting - a1 
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Node manager starting 
13/07/03 22:15:34 INFO lifecycle.LifecycleSupervisor: Starting lifecycle supervisor 9 
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: Configuration provider starting 
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: Reloading configuration file:example.conf 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k1 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k1 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Added sinks: k1 k2 Agent: a1 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2 
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: Creating channels 
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: c1, registered successfully. 
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: created channel c1 
13/07/03 22:15:34 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 
13/07/03 22:15:34 INFO sink.DefaultSinkFactory: Creating instance of sink: k2, type: hdfs 
2013-07-03 22:15:34.777 java[5903:5703] Unable to load realm info from SCDynamicStore 
13/07/03 22:15:34 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false 
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SINK, name: k2, registered successfully. 
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } }, k2=SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel c1 
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink k1 
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink k2 
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started 
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Source r1 
13/07/03 22:15:34 INFO source.ExecSource: Exec source starting with command:cat /Users/david/flume/testdata.txt 
13/07/03 22:15:34 INFO source.ExecSource: Command [cat /Users/david/flume/testdata.txt] exited with 0 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 30 38 2F 30 35 2F 32 002785A9,08/05/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 30 38 2F 30 34 2F 32 002785A9,08/04/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 37 2F 32 002785A9,10/07/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 34 2F 32 002785A9,10/04/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 35 2F 32 002785A9,10/05/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 36 2F 32 002785A9,10/06/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 37 38 38 36 2C 31 30 2F 30 34 2F 32 00277886,10/04/2 } 
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 30 30 35 44 42 33 2C 30 39 2F 31 39 2F 32 00005DB3,09/19/2 } 
13/07/03 22:15:35 INFO hdfs.BucketWriter: Creating /flume/trades/13-07-03/2215/34/FlumeData.1372904134974.tmp 
13/07/03 22:16:05 INFO hdfs.BucketWriter: Renaming /flume/trades/13-07-03/2215/34/FlumeData.1372904134974.tmp to /flume/trades/13-07-03/2215/34/FlumeData.1372904134974 
+0

使用假脫機目錄源可能會得到更可靠的結果。這是爲了監視新文件的目錄,然後通過Flume轉發它們。請注意,這些文件不需要更改。如果他們這樣做,這個來源會很痛苦地抱怨。 – Sarge

回答

0

我摸不着頭腦。原來我需要更具體的配置文件,現在看起來像這樣:

# Example command to run: 
# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k2 
a1.channels = c1 

# Describe/configure the source 
# a1.sources.r1.type = netcat 
# a1.sources.r1.bind = localhost 
# a1.sources.r1.port = 44444 
a1.sources.r1.type = exec 
a1.sources.r1.shell = /bin/bash -c 
a1.sources.r1.command = cat /Users/david/flume/testdata.txt 
a1.sources.r1.interceptors = a 
a1.sources.r1.interceptors.a.type = org.apache.flume.interceptor.TimestampInterceptor$Builder 

# Describe the sinks 
a1.sinks.k1.type = logger 

a1.sinks.k2.type = hdfs 
a1.sinks.k2.channel = c1 
a1.sinks.k2.hdfs.fileType = DataStream 
a1.sinks.k2.hdfs.batchSize = 2000 
a1.sinks.k2.hdfs.rollCount = 5000 
a1.sinks.k2.hdfs.rollSize = 1000000 
a1.sinks.k2.hdfs.rollInterval = 10 
a1.sinks.k2.hdfs.writeFormat = Text 
# a1.sinks.k2.hdfs.path = /flume 
a1.sinks.k2.hdfs.path = /flume/trades/%y-%m-%d/%H%M/%S 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1 
0

我也遇到了同樣的問題。我認爲,一旦消息來源填補了渠道&它匯入hdfs。不知何故該頻道不會變得免費,因此它不會將其他部分顯示到hdfs中。

-1

如果您想從cat文件獲取流式數據,cat文件必須位於flume-ng - > conf目錄中,然後您才能執行。