2014-02-26 31 views
2

我有一個使用jeromq 0.3.2在Java中編寫的多線程應用程序。我正在將它放入Tanuki服務包裝中,以便它作爲一個Windows服務運行,並且在乾淨地停止線程時遇到了一些麻煩。代理的run()方法是從the guideSimple Pirate格局的變化,看起來像這樣:乾淨地打斷zeromq輪詢線程

public void run() 
{ 
    ZContext context = new ZContext(); 
    Socket frontend = context.createSocket(ZMQ.ROUTER); 
    Socket backend = context.createSocket(ZMQ.ROUTER); 
    frontend.bind("tcp://*:5555"); 
    backend.bind("inproc://workers"); 

    while (!Thread.currentThread().isInterrupted()) 
    { 
     ZMQ.Poller poller = new ZMQ.Poller(2); 
     poller.register(frontend, ZMQ.Poller.POLLIN); 
     poller.register(backend, ZMQ.Poller.POLLIN); 
     poller.poll(); 
     if (poller.pollin(0)) 
     { 
      processFrontend(frontend, backend, context); 
     } 
     if (poller.pollin(1)) 
     { 
      processBackend(frontend, backend); 
     } 
    } 

    // additonal code to close worker threads 
} 

我怎樣才能乾淨地從這個循環退出時,控制包裝要停止應用程序?

如果當前沒有連接的客戶端,則在調用poller.poll()時會阻止該循環,因此如果包裝器在線程上調用interrupt(),它將被忽略。如果有客戶目前在發送郵件,然後interrupt()調用導致poller.poll()方法拋出zmq.ZError$IOException: java.nio.channels.ClosedByInterruptException

我也試着使用:

PollItem[] items = { 
    new PollItem(frontend, Poller.POLLIN), 
    new PollItem(backend, Poller.POLLIN) 
}; 
int rc = ZMQ.poll(items, 2, -1); 

if (rc == -1) 
    break; 

if (items[0].isReadable()) 
{ 
    processFrontend(frontend, backend, context); 
} 

if (items[1].isReadable()) 
{ 
    processBackend(frontend, backend); 
} 

但調用ZMQ.poll展品相同的行爲。兩種可能的方案是:

  • 設置超時上ZMQ.poll和包裝在一個try/catch爲IOException異常的run()方法的內容。
  • 向我的Runnable添加一個方法,它將連接到前端併發送一條特殊消息,該消息將在processFrontend中讀取並導致代碼跳出循環。

第一個看起來有點髒,第二個感覺有點脆弱。我應該考慮其他方法嗎?是否有一些其他的輪詢方法可以用來更乾淨地迴應線程中斷?

回答

2

這是更好地提取循環到單獨的線程,並打開其他INPROC PULL插座,這樣從3個插座迴路調查的消息:frontendbackendcontrol。如果主線程收到中斷信號,則應發送STOP命令到代理循環。

正是這種模式在我的項目實施:https://github.com/thriftzmq/thriftzmq-java/blob/master/thriftzmq/src/main/java/org/thriftzmq/ProxyLoop.java

在主線程您可以添加一個shutdownHook,並從內部將其發送STOP命令循環。

Runtime.getRuntime().addShutdownHook(new Thread() { 
    @Override 
    public void run() { 
     try { 
      //Connect to controlSocket and send STOP command 
      ZMQ.Socket peer = context.socket(ZMQ.PUSH); 
      peer.connect(CONTROL_ENDPOINT); 
      peer.send(STOP_MESSAGE, 0); 
      peer.close(); 
     } catch (Exception ex) { 
      logger.error("Failed to stop service", ex); 
      System.exit(1); 
     } 
    } 
}); 

P.S.使用來自guava-library的基元來管理服務生命週期,使生活變得更加簡單。

+0

雖然我沒有最終使用關閉掛鉤的方法,但我確實使用了額外的'control'套接字模式來顯着簡化應用程序。您提供的鏈接中的代碼非常有助於爲我提供其他方法的想法。 –