2016-01-22 50 views
3

我寫了一個名爲MySink的自定義flume sink,其處理方法在下面的第一個代碼段中指出。我得到一個IllegalStateException如下(詳細的堆棧跟蹤的第二個片段是可用下圖):FLUME IllegalStateException:begin()在事務處於OPEN狀態時調用

致:java.lang.IllegalStateException:開始()調用時 交易是開放的!

問題:我按照KafkaSink和在水槽代碼庫現有類似水槽的實現,而寫的過程的方法和我申請非常相同的事務處理邏輯與那些離開水槽。你能告訴我這裏的過程方法有什麼問題嗎?我該如何解決這個問題?

PROCESS方法(I已標記在拋出異常):

@Override 
public Status process() throws EventDeliveryException { 
    Status status = Status.READY; 
    Channel ch = getChannel(); 
    Transaction txn = ch.getTransaction(); 
    Event event = null; 

    try { 
     LOG.info(getName() + " BEFORE txn.begin()"); 
    //!!!! EXCEPTION IS THROWN in the following LINE !!!!!! 
     txn.begin(); 
     LOG.info(getName() + " AFTER txn.begin()"); 
     LOG.info(getName() + " BEFORE ch.take()"); 
     event = ch.take(); 
     LOG.info(getName() + " AFTER ch.take()"); 

     if (event == null) { 
      // No event found, request back-off semantics from the sink runner 
      LOG.info(getName() + " - EVENT is null! "); 
      return Status.BACKOFF; 
     } 

     Map<String, String> keyValueMapInTheMessage = event.getHeaders(); 
     if (!keyValueMapInTheMessage.isEmpty()) { 
      mDBWriter.insertDataToDB(keyValueMapInTheMessage); 
     } 

     LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event)); 
     if (txn != null) { 
      txn.commit();     
     } 

    } catch (Exception ex) { 
     String errMsg = getName() + " - Failed to publish events. Exception: "; 
     LOG.info(errMsg); 
     status = Status.BACKOFF; 
     if (txn != null) { 
      try { 
       txn.rollback(); 
      } catch (Exception e) { 
       LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event)); 
       throw Throwables.propagate(e); 
      } 
     } 
     throw new EventDeliveryException(errMsg, ex); 
    } finally { 
     if (txn != null) { 
      txn.close(); 
     } 
    } 

    return status; 
} 

異常堆棧:

2016-01-22 14:01:15,440 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] 

Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: MySink - Failed to publish events. 
Exception: at com.XYZ.flume.maprdb.MySink.process(MySink.java:116) 
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) 
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN! 
at com.google.common.base.Preconditions.checkState(Preconditions.java:145) 
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) 
at com.XYZ.flume.maprdb.MySink.process(MySink.java:82) 
... 3 more 
+0

我已經發現問題的起源。我使用MapR的OJAI API來存儲接收器所接收的數據,如果我刪除存儲數據的行,則看起來異常消失,即: mDBWriter.insertDataToDB(keyValueMapInTheMessage); 我在調查這個OJAI API中的事情是什麼導致事務搞砸了。 –

+0

我有一個自定義接收器的類似問題。你有沒有解決這個問題? – bearrito

回答

0

if (event == null) { 
 
    // No event found, request back-off semantics from the sink runner 
 
    LOG.info(getName() + " - EVENT is null! "); 
 
    return Status.BACKOFF; 
 
}

這個代碼將導致此問題。當事件爲null時,您只需返回它。但正確的方法是提交或回滾。事務應該經歷三個階段:begin,commit或rollback,最後close.we可以看到以下源代碼以查找它如何實現。

BasicChannelSemantics:

public Transaction getTransaction() { 
 

 
    if (!initialized) { 
 
     synchronized (this) { 
 
     if (!initialized) { 
 
      initialize(); 
 
      initialized = true; 
 
     } 
 
     } 
 
    } 
 

 
    BasicTransactionSemantics transaction = currentTransaction.get(); 
 
    if (transaction == null || transaction.getState().equals(
 
      BasicTransactionSemantics.State.CLOSED)) { 
 
     transaction = createTransaction(); 
 
     currentTransaction.set(transaction); 
 
    } 
 
    return transaction; 
 
    }

的currentTransaction時爲空或它的狀態是關閉,通道將創建一個新的,否則返回舊的。這個例外不會立即發生。當第一次執行處理方法時,會得到一個新的事務,但事件爲空,只是返回並最終關閉,close方法因其實現而不起作用,因此第二次執行處理方法時,您不會沒有得到新的交易,它是舊的。以下代碼是關於交易實施的。

BasicTransactionSemantics:

protected BasicTransactionSemantics() { 
 
    state = State.NEW; 
 
    initialThreadId = Thread.currentThread().getId(); 
 
    } 
 
    
 
    public void begin() { 
 
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, 
 
     "begin() called from different thread than getTransaction()!"); 
 
    Preconditions.checkState(state.equals(State.NEW), 
 
     "begin() called when transaction is " + state + "!"); 
 

 
    try { 
 
     doBegin(); 
 
    } catch (InterruptedException e) { 
 
     Thread.currentThread().interrupt(); 
 
     throw new ChannelException(e.toString(), e); 
 
    } 
 
    state = State.OPEN; 
 
    } 
 

 
    public void commit() { 
 
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, 
 
     "commit() called from different thread than getTransaction()!"); 
 
    Preconditions.checkState(state.equals(State.OPEN), 
 
     "commit() called when transaction is %s!", state); 
 

 
    try { 
 
     doCommit(); 
 
    } catch (InterruptedException e) { 
 
     Thread.currentThread().interrupt(); 
 
     throw new ChannelException(e.toString(), e); 
 
    } 
 
    state = State.COMPLETED; 
 
    } 
 

 
public void rollback() { 
 
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, 
 
     "rollback() called from different thread than getTransaction()!"); 
 
    Preconditions.checkState(state.equals(State.OPEN), 
 
     "rollback() called when transaction is %s!", state); 
 

 
    state = State.COMPLETED; 
 
    try { 
 
     doRollback(); 
 
    } catch (InterruptedException e) { 
 
     Thread.currentThread().interrupt(); 
 
     throw new ChannelException(e.toString(), e); 
 
    } 
 
    } 
 
    
 
public void close() { 
 
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, 
 
     "close() called from different thread than getTransaction()!"); 
 
    Preconditions.checkState(
 
      state.equals(State.NEW) || state.equals(State.COMPLETED), 
 
      "close() called when transaction is %s" 
 
      + " - you must either commit or rollback first", state); 
 

 
    state = State.CLOSED; 
 
    doClose(); 
 
    }

創建的時候,國家是新的。

開始時,狀態必須是新的,然後狀態變爲開放狀態。

當提交或回滾時,狀態必須打開,然後狀態變爲完整。

關閉時,狀態必須完整,然後狀態變得接近。

所以當你以正確的方式執行close方法時,下一次你會得到一個新的事務,否則舊的那個狀態不能是新的,所以你不能執行transaction.begin(),它需要一個新的。

相關問題