2014-09-01 88 views
4

這是我第一次在Java中創建一個多線程應用程序,它將持續運行直到取消,並且無法關閉/中斷我的線程。如何中斷循環中阻塞的可調用線程?

我有一些線程與Mediator進行通信,該Mediator封裝了TransferQueue和ExecutorService,並且促進了生產線程和消費線程之間的通信。

我使用這個調解器而不是Future,因爲TransferQueue對於處理多個生產者的單個消費者(生產者線程可以隨時調用mediator.put(E e))和消費者線程可以在E e = mediator.take()上等待某些東西可用),並且我不想浪費CPU週期輪詢。

設計非常乾淨,快速和有效,但是我在中斷阻塞queue.take(),serverSocket.accept()並中斷整個線程時遇到了問題。


生產者:

public class SocketProducer implements Colleague<Socket> { 
    private Mediator<Socket> mediator; 
    private ServerSocket serverSocket; 
    private Integer listeningPort; 

    private volatile boolean runnable = true; 

    public SocketProducer(Mediator<Socket> mediator) { 
     this.mediator = mediator; 
    } 

    public Colleague<Socket> setListeningPort(Integer listeningPort) { 
     this.listeningPort = listeningPort; 
     return this; 
    } 

    public Void call() throws Exception { 
     serverSocket = new ServerSocket(listeningPort, 10); 

     while (runnable) { 
      Socket socket = serverSocket.accept(); // blocks until connection 

      mediator.putIntoQueue(socket); 
     } 

     return null; 
    } 

    public void interrupt() { 
     // ? 
     runnable = false; 
     serverSocket.close(); 
     // ? 
    } 
} 

和消費者:

private class SocketConsumer implements Colleague<Socket> { 
    private Mediator<Socket> mediator; 
    private volatile boolean runnable = true; 

    public SomeConsumer(Mediator<Socket> mediator) { 
     this.mediator = mediator; 
    } 

    public Void call() throws Exception { 
     while (runnable) { 
      Socket socket = mediator.takeFromQueue(); // blocks until element is in queue 
     } 

     return null; 
    } 

    public void interrupt() { 
     // ? 
     runnable = false; 
     // ? 
    } 
} 

同事接口只是擴展可贖回,給一些額外的功能給中介管理其生產/消費者同事(即:要求:每個同事。中斷())。

我已經嘗試了很多方法,在各個地方拋出InterruptedException,在各個地方捕獲InterruptedException,讓線程返回其中斷的中介的線程實例。我所嘗試過的所有東西都是如此無效,以至於我覺得我錯過了這個難題的一些關鍵部分。

到目前爲止,我所見過的最有效的方法是毒丸(如果隊列沒有將NPE置於空插入狀態,這將非常棒),以及我嘗試引入毒物的所有方法由於ClassCastException(嘗試將Object轉換爲套接字,試圖實例化一個通用套接字等)而失敗。

我真的不知道該從哪裏出發。我真的希望能夠按需要乾淨地終止這些線程。


完成的解決方案:

public class SocketProducer implements Colleague<Socket> { 
    private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName()); 
    private Mediator<Socket> mediator; 
    private ServerSocket serverSocket; 
    private Integer listeningPort; 

    private volatile boolean runnable = true; 

    public SocketProducer(Mediator<Socket> mediator) { 
     this.mediator = mediator; 
    } 

    public Colleague<Socket> setListeningPort(Integer listeningPort) { 
     this.listeningPort = listeningPort; 
     return this; 
    } 

    public Void call() throws Exception { 
     serverSocket = new ServerSocket(listeningPort, 10); 
     logger.info("Listening on port " + listeningPort); 

     while (runnable) { 
      try { 
       Socket socket = serverSocket.accept(); 
       logger.info("Connected on port " + socket.getLocalPort()); 
       mediator.putIntoQueue(socket); 
      } catch (SocketException e) { 
       logger.info("Stopped listening on port " + listeningPort); 
      } 
     } 

     return null; 
    } 

    public void interrupt() { 
     try { 
      runnable = false; 
      serverSocket.close(); 
     } catch (IOException e) { 
      logger.error(e); 
     } 
    } 

} 

public class SocketConsumer implements Colleague<Socket> { 
    private static final Logger logger = getLogger(SocketConsumer.class.getName()); 
    private Mediator<Socket> socketMediator; 

    public SocketConsumer(Mediator<Socket> mediator) { 
     this.socketMediator = mediator; 
    } 

    public Void call() throws Exception { 
     while (!Thread.currentThread().isInterrupted()) { 
      try { 
       Socket socket = socketMediator.takeFromQueue(); 
       logger.info("Received socket on port: " + socket.getLocalPort()); 
      } catch (InterruptedException e) { 
       logger.info("Interrupted."); 
       Thread.currentThread().interrupt(); 
      } 
     } 
     return null; 
    } 

    public void interrupt() { 
     Thread.currentThread().interrupt(); 
    } 

} 
+3

我現在可以看到的第一個顯而易見的事情是,你在'runnable'字段旁邊缺少'volatile'。由於您嘗試中斷另一個線程的線程,因此您需要確保字段更改的可見性。我也會把它變成普通的'boolean',而不是'Boolean'。 – 2014-09-01 20:13:17

+0

也許我可以將Socket和ServerSocket封裝到它們自己的類中,這將允許我創建它們的獨特毒藥實例嗎? – CodeShaman 2014-09-01 20:23:18

+0

如果你調用'ServerSocket#close()',那麼阻塞的'accept()'調用會拋出一個異常......這樣做有幫助嗎? – 2014-09-01 20:30:33

回答

2

我認爲毒丸只會讓事情變得更加複雜,所以我會保持簡單。

至於ServerSocket,this answer建議調用close()應該足以中斷它。

至於BlockingQueue,消費者可以是這樣的:

// you can use volatile flag instead if you like 
while (!Thread.currentThread.isInterrupted()) { 
    try { 
     Object item = queue.take(); 
     // do something with item 
    } catch (InterruptedException e) { 
     log.error("Consumer interrupted", e); 
     Thread.currentThread().interrupt(); // restore flag 
    } 
} 

然後在你的Mediator你可以調用一個消費者線程interrupt()

+0

關閉ServerSocket將不會產生IMO作爲[TranferQueue.transfer(E)-method](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TransferQueue .html#transfer%28E%29)似乎是阻止順序。關閉套接字將會關閉套接字將提供的流,但不會將套接字從隊列中移除。因此,無論是中介(隊列)需要中斷還是替代阻塞語句,都可以使用輪詢/阻塞超時方法。 – 2014-09-01 20:45:31

+0

@RomanVottner在生產者中實際上有兩個阻塞操作(套接字和隊列),都需要處理。 – 2014-09-01 20:49:15

+0

我發誓我已經嘗試過這個,但突然它奇妙地工作。我想我希望有一種解決方案適用於這兩種情況,但幸好我可以解決這個問題。 消費者中斷乾淨,但我必須使用一個runnable標誌並關閉生產者的套接字。它完全忽略了線程中斷。 – CodeShaman 2014-09-01 20:54:51

1

毒丸是直線前進。

private static Socket poisonPill = new Socket(); 

public Void call() throws Exception { 
    while (runnable) { 
     Socket socket = mediator.takeFromQueue(); // blocks until element is in queue 
     if (socket == poisonPill) { 
      // quit the thread... 
     } 
    } 

    return null; 
} 

注意socket == poisonPill。這是一個平等檢查,他們是完全相同的實例,所以這是poisonPill的工作方式仍然是類型安全的。