我寫了一個名爲MySink的自定義flume sink,其處理方法在下面的第一個代碼段中指出。我得到一個IllegalStateException如下(詳細的堆棧跟蹤的第二個片段是可用下圖):FLUME IllegalStateException:begin()在事務處於OPEN狀態時調用
致:java.lang.IllegalStateException:開始()調用時 交易是開放的!
問題:我按照KafkaSink和在水槽代碼庫現有類似水槽的實現,而寫的過程的方法和我申請非常相同的事務處理邏輯與那些離開水槽。你能告訴我這裏的過程方法有什麼問題嗎?我該如何解決這個問題?
PROCESS方法(I已標記在拋出異常):
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
Event event = null;
try {
LOG.info(getName() + " BEFORE txn.begin()");
//!!!! EXCEPTION IS THROWN in the following LINE !!!!!!
txn.begin();
LOG.info(getName() + " AFTER txn.begin()");
LOG.info(getName() + " BEFORE ch.take()");
event = ch.take();
LOG.info(getName() + " AFTER ch.take()");
if (event == null) {
// No event found, request back-off semantics from the sink runner
LOG.info(getName() + " - EVENT is null! ");
return Status.BACKOFF;
}
Map<String, String> keyValueMapInTheMessage = event.getHeaders();
if (!keyValueMapInTheMessage.isEmpty()) {
mDBWriter.insertDataToDB(keyValueMapInTheMessage);
}
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
if (txn != null) {
txn.commit();
}
} catch (Exception ex) {
String errMsg = getName() + " - Failed to publish events. Exception: ";
LOG.info(errMsg);
status = Status.BACKOFF;
if (txn != null) {
try {
txn.rollback();
} catch (Exception e) {
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errMsg, ex);
} finally {
if (txn != null) {
txn.close();
}
}
return status;
}
異常堆棧:
2016-01-22 14:01:15,440 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: MySink - Failed to publish events. Exception: at com.XYZ.flume.maprdb.MySink.process(MySink.java:116)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at com.XYZ.flume.maprdb.MySink.process(MySink.java:82)
... 3 more
我已經發現問題的起源。我使用MapR的OJAI API來存儲接收器所接收的數據,如果我刪除存儲數據的行,則看起來異常消失,即: mDBWriter.insertDataToDB(keyValueMapInTheMessage); 我在調查這個OJAI API中的事情是什麼導致事務搞砸了。 –
我有一個自定義接收器的類似問題。你有沒有解決這個問題? – bearrito