2016-12-28 28 views
1

我想使用Flume在Python腳本中收集日誌,因此我按照用戶指南配置了使用netcat源的Flume,然後使用telnet和nc進行測試,它運作良好。在使用python套接字或telnet工作時無法正確獲取事件

我的配置代碼:

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
a1.sinks.k1.type = logger 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

然後我使用Python連接水槽,併發送一些話這樣說:

import socket 
def netcat(hostname, port): 
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    s.connect((hostname, port)) 
    s.send("test words 1\n") 
    s.send("test words 2\n") 
    s.send("test words 3\n") 
    s.send("test words 4\n") 
    s.shutdown(socket.SHUT_WR) 
    s.close() 

if _name_ == "_main_": 
    netcat("127.0.0.1",44444) 

問題發生,水槽只能接收2行。 水槽日誌:

2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31 test words 1 } 2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32 test words 2 }

我有兩個Ubuntu的& Java1.8和CentOS &的Java 1.7,並在Python的telnet模型相同的結果相同的結果。

配置或Python腳本有什麼問題嗎?或者任何人有這種情況下的建議?

回答

2

發生這種情況的原因是因爲您並未等待響應回來。默認情況下,Flume的netcat源將爲每個事件發回「OK」消息。在可以發送響應之前,您正在終止連接,這會導致進一步的消息處理失敗(因爲管道已從客戶端斷開)。

要解決這個問題,你需要進行以下更改您的flume.conf:

a1.sources.r1.ack-every-event=false 

這樣就要求一個「OK」發送,並因此停止失敗。

或者,您可以更改您的Python以等待每次在關閉連接前發送「確定」消息。人爲地,在中增加一個睡眠聲明,應該也解決了這個問題,雖然你會假設處理你的消息需要多長時間。通常很好,但可能會有其他情況導致處理延遲。

相關問題