2013-02-16 44 views
0

這裏的第一次堆棧溢出問題...會嘗試儘可能多地包含詳細信息。Flume Avro可以連接到Node.js服務器嗎?

我試圖通過Avro接收器將Apache Flume日誌數據傳輸到Node.js服務器,在特定的端口上偵聽。我打算使用Collective Media's node-avro library來幫助在Avro的二進制格式和JSON之間進行序列化,因此我可以使用Node.js中的數據(我通過socket.io pub/sub將它傳遞給客戶端)。

我很小心,因爲我看到數據流經通道並輸出到控制檯(僅用於調試,我也將數據彙集到控制檯),所以我已經正確配置了Flume。

2013-02-15 22:06:09,858 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: Failed to send events 
    at org.apache.flume.sink.AvroSink.process(AvroSink.java:325) 
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) 
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Failed to send batch 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236) 
    at org.apache.flume.sink.AvroSink.process(AvroSink.java:309) 
    ... 3 more 
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Exception thrown from remote handler 
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318) 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295) 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224) 
    ... 4 more 
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: NettyTransceiver closed 
    at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) 
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310) 
    ... 6 more 
Caused by: java.io.IOException: NettyTransceiver closed 
    at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338) 
    at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59) 
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:236) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:93) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) 
    at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:476) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:623) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:101) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38) 
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 
2013-02-15 22:06:14,895 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] Avro sink k1: Building RpcClient with hostname: 127.0.0.1, port: 4242 

我不知道什麼:當我啓用的Avro水槽和造就了同一端口上偵聽Node.js的服務器,不過,水槽當它試圖做的Avro轉移拋出一個異常,是如何弄清楚我的Node.js服務是否至少得到了消息。我是很新,Node.js的,這樣沒有幫助,但這裏的代碼片段,設置了監聽器:

var flumeSink = require('http').createServer(flumeHandler); 
flumeSink.listen(8000); 
function flumeHandler (req, res) { 
    console.log("Got it!"); 
    //var schema = avro.prepareSchema("string"); 
    //var buffer = schema.encode("foo"); 
    //var value = schema.decode(buffer); 
} 

我想我已經設置了Node.js的側錯誤。我正在使用HTTP模塊,這可能不是正確的模塊。也許我需要考慮在Node.js中編寫自定義接收器?指針/幫助表示讚賞!

+0

閱讀更多內容,我發現Avro通過RPC工作,所以我認爲HTTP的確是錯誤的。 – ErikB 2013-02-16 04:39:28

+0

嘗試用dnode替換http偵聽器,但得到相同的錯誤。 – ErikB 2013-02-17 04:52:08

回答

0

在這種情況下,您可能並不需要avro接收器,因爲它旨在將Flume與Flume進行通信(這是您構建Flume連接拓撲的方式)。

如果你想創建一個不在標準列表中的接收器,你需要建立一個自定義接收器,並使用https://flume.apache.org/FlumeUserGuide.html#custom-sink 中定義的自定義配置,這個我試過了,它的工作原理完美。

或使用的東西存在:

https://github.com/josealvarezmuguerza/flume-http-sink

我從來沒有使用這個模塊。剛剛通過谷歌搜索來。

對於Avro部分,只需使用morphlines將源代碼轉換爲avro,然後將每個事件發佈到您的node.js服務器。

希望這給了一點點光。

去代碼!

相關問題