我正在嘗試爲flume-ng編寫自定義接收器。我查看了現有的接收器和文檔並對其進行了編碼。然而,應該接收事件的'process()'方法總是以null結束。 我在做Event event = channel.take();但事件爲空。我在日誌中看到,由於事件仍在通道中,因此會重複調用此方法。Flume-ng null事件的自定義接收器
有人能指出我正確的方向嗎?
我正在嘗試爲flume-ng編寫自定義接收器。我查看了現有的接收器和文檔並對其進行了編碼。然而,應該接收事件的'process()'方法總是以null結束。 我在做Event event = channel.take();但事件爲空。我在日誌中看到,由於事件仍在通道中,因此會重複調用此方法。Flume-ng null事件的自定義接收器
有人能指出我正確的方向嗎?
這是一個處理功能的骨架......如果你不能讓你回滾,事件狀態更改爲BACKOFF。如果不是,您承諾和設置狀態爲就緒。無論如何,你總是關閉交易。
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
if (event != null && validEvent(event.getBody()) >= 0) {
# make some printing
}
transaction.commit();
status = Status.READY;
} catch (Throwable ex) {
transaction.rollback();
status = Status.BACKOFF;
logger.error("Failed to deliver event. Exception follows.", ex);
throw new EventDeliveryException("Failed to deliver event: " + ex);
} finally {
transaction.close();
}
return status;
我相信這會工作:)。
這是設計。水槽跑步者將通過null
事件輪詢水槽,以確保水槽處於活動狀態並準備好接受未來事件。當您收到null
事件時,請確保您返回Status.BACKOFF
,並且接收處理器在再次嘗試之前會稍等一會。
奇怪的是[文檔](http://flume.apache.org/FlumeDeveloperGuide.html#sink)沒有提到這一點。 – Dmitry 2013-04-03 21:08:28
我同意。 Flume文檔非常小,應該更詳細些。 – logicalgeek 2016-06-01 07:25:16
退避持續時間是多少?它是如何控制的? AbstractSink類不實現像Source這樣的方法。 public long getBackOffSleepIncrement() public long getMaxBackOffSleepInterval( – bearrito 2017-04-26 03:30:27
真棒。謝謝。它仍然幫助我在2016 .. – logicalgeek 2016-06-01 07:23:34
嘿我有一個類似的問題在這裏:https://stackoverflow.com/questions/46479157/streaming-kafka- messages-to-mysql-database 你對此有什麼想法嗎? – 2017-09-30 10:21:02