1

我有一些服務,既從入站隊列中消耗,又產生到一些出站隊列(由此服務創建的另一個線程拾取消息並將它們「傳輸」到目的地)。如何在Java中運行輪詢阻止消費者?

目前我使用兩個普通的Thread s,如下面的代碼所示,但我知道一般情況下不應該再使用它們,而應該使用ExecutorService等更高級別的抽象。

這對我而言是否合理?更具體地說,我的意思是 - >

  • 它會減少代碼嗎?
  • 使代碼在失敗的情況下更健壯?
  • 允許更平滑的線程終止? (這在運行測試時是有幫助的)

我在這裏錯過了一些重要的東西嗎?

// called on service startup 
private void init() { 
    // prepare everything here 
    startInboundWorkerThread(); 
    startOutboundTransporterWorkerThread(); 
} 

private void startInboundWorkerThread() { 
    InboundWorkerThread runnable = injector.getInstance(InboundWorkerThread.class); 
    inboundWorkerThread = new Thread(runnable, ownServiceIdentifier); 
    inboundWorkerThread.start(); 
} 

// this is the Runnable for the InboundWorkerThread 
// the runnable for the transporter thread looks almost the same 
@Override 
public void run() {  
    while (true) { 
     InboundMessage message = null; 
     TransactionStatus transaction = null; 

     try { 
      try { 
       transaction = txManager.getTransaction(new DefaultTransactionDefinition()); 
      } catch (Exception ex) { 
       // logging 
       break; 
      } 

      // blocking consumer 
      message = repository.takeOrdered(template, MESSAGE_POLL_TIMEOUT_MILLIS); 
      if (message != null) {     
       handleMessage(message); 
       commitTransaction(message, transaction); 
      } else { 
       commitTransaction(transaction); 
      } 
     } catch (Exception e) { 
      // logging 
      rollback(transaction); 
     } catch (Throwable e) { 
      // logging 
      rollback(transaction); 
      throw e; 
     } 

     if (Thread.interrupted()) { 
      // logging 
      break; 
     } 
    } 

    // logging 
} 

// called when service is shutdown 
// both inbound worker thread and transporter worker thread must be terminated 
private void interruptAndJoinWorkerThread(final Thread workerThread) { 
    if (workerThread != null && workerThread.isAlive()) { 
     workerThread.interrupt(); 

     try { 
      workerThread.join(TimeUnit.SECONDS.toMillis(1)); 
     } catch (InterruptedException e) { 
      // logging 
     } 
    } 
} 

回答

0

使用線程池,我的主要好處來自構建線程的單一的,獨立的和一般的短作業的工作和更好的抽象的ThreadPool(從java.util.concurrent中梅比一些其他類) s私人Worker s。有時你可能想要更直接地訪問這些人,以確定他們是否仍在運行等。但通常有更好的方法來做到這一點。

至於處理故障,您可能需要提交自己的ThreadFactory與自定義UncaughtExceptionHandler創建線程,並在一般情況下,你的Runnable工作應提供良好的異常處理,也以記錄有關特定作業的更多信息失敗。 使這些作業非阻塞,因爲你不想用被阻止的工作者填滿你的ThreadPool。在作業排隊之前移動阻止操作。

正常情況下,shutdownshutdownNowExecutorService提供,並結合作業中正確的中斷處理將允許平滑的作業終止。

相關問題