2013-03-10 50 views
5

我正在嘗試爲flume-ng編寫自定義接收器。我查看了現有的接收器和文檔並對其進行了編碼。然而,應該接收事件的'process()'方法總是以null結束。 我在做Event event = channel.take();但事件爲空。我在日誌中看到,由於事​​件仍在通道中,因此會重複調用此方法。Flume-ng null事件的自定義接收器

有人能指出我正確的方向嗎?

回答

5

這是一個處理功能的骨架......如果你不能讓你回滾,事件狀態更改爲BACKOFF。如果不是,您承諾和設置狀態爲就緒。無論如何,你總是關閉交易。

Status status = null; 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 
    transaction.begin(); 
    try { 
     Event event = channel.take(); 

     if (event != null && validEvent(event.getBody()) >= 0) { 
      # make some printing 
     } 
     transaction.commit(); 
     status = Status.READY; 
    } catch (Throwable ex) { 
     transaction.rollback(); 
     status = Status.BACKOFF; 
     logger.error("Failed to deliver event. Exception follows.", ex); 
     throw new EventDeliveryException("Failed to deliver event: " + ex); 
    } finally { 
     transaction.close(); 
    } 
    return status; 

我相信這會工作:)。

+0

真棒。謝謝。它仍然幫助我在2016 .. – logicalgeek 2016-06-01 07:23:34

+0

嘿我有一個類似的問題在這裏:https://stackoverflow.com/questions/46479157/streaming-kafka- messages-to-mysql-database 你對此有什麼想法嗎? – 2017-09-30 10:21:02

4

這是設計。水槽跑步者將通過null事件輪詢水槽,以確保水槽處於活動狀態並準備好接受未來事件。當您收到null事件時,請確保您返回Status.BACKOFF,並且接收處理器在再次嘗試之前會稍等一會。

+0

奇怪的是[文檔](http://flume.apache.org/FlumeDeveloperGuide.html#sink)沒有提到這一點。 – Dmitry 2013-04-03 21:08:28

+0

我同意。 Flume文檔非常小,應該更詳細些。 – logicalgeek 2016-06-01 07:25:16

+0

退避持續時間是多少?它是如何控制的? AbstractSink類不實現像Source這樣的方法。 public long getBackOffSleepIncrement() public long getMaxBackOffSleepInterval( – bearrito 2017-04-26 03:30:27