這是我第一次在Java中創建一個多線程應用程序,它將持續運行直到取消,並且無法關閉/中斷我的線程。如何中斷循環中阻塞的可調用線程?
我有一些線程與Mediator進行通信,該Mediator封裝了TransferQueue和ExecutorService,並且促進了生產線程和消費線程之間的通信。
我使用這個調解器而不是Future,因爲TransferQueue對於處理多個生產者的單個消費者(生產者線程可以隨時調用mediator.put(E e))和消費者線程可以在E e = mediator.take()上等待某些東西可用),並且我不想浪費CPU週期輪詢。
設計非常乾淨,快速和有效,但是我在中斷阻塞queue.take(),serverSocket.accept()並中斷整個線程時遇到了問題。
生產者:
public class SocketProducer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
while (runnable) {
Socket socket = serverSocket.accept(); // blocks until connection
mediator.putIntoQueue(socket);
}
return null;
}
public void interrupt() {
// ?
runnable = false;
serverSocket.close();
// ?
}
}
和消費者:
private class SocketConsumer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private volatile boolean runnable = true;
public SomeConsumer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Void call() throws Exception {
while (runnable) {
Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
}
return null;
}
public void interrupt() {
// ?
runnable = false;
// ?
}
}
同事接口只是擴展可贖回,給一些額外的功能給中介管理其生產/消費者同事(即:要求:每個同事。中斷())。
我已經嘗試了很多方法,在各個地方拋出InterruptedException,在各個地方捕獲InterruptedException,讓線程返回其中斷的中介的線程實例。我所嘗試過的所有東西都是如此無效,以至於我覺得我錯過了這個難題的一些關鍵部分。
到目前爲止,我所見過的最有效的方法是毒丸(如果隊列沒有將NPE置於空插入狀態,這將非常棒),以及我嘗試引入毒物的所有方法由於ClassCastException(嘗試將Object轉換爲套接字,試圖實例化一個通用套接字等)而失敗。
我真的不知道該從哪裏出發。我真的希望能夠按需要乾淨地終止這些線程。
完成的解決方案:
public class SocketProducer implements Colleague<Socket> {
private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
logger.info("Listening on port " + listeningPort);
while (runnable) {
try {
Socket socket = serverSocket.accept();
logger.info("Connected on port " + socket.getLocalPort());
mediator.putIntoQueue(socket);
} catch (SocketException e) {
logger.info("Stopped listening on port " + listeningPort);
}
}
return null;
}
public void interrupt() {
try {
runnable = false;
serverSocket.close();
} catch (IOException e) {
logger.error(e);
}
}
}
public class SocketConsumer implements Colleague<Socket> {
private static final Logger logger = getLogger(SocketConsumer.class.getName());
private Mediator<Socket> socketMediator;
public SocketConsumer(Mediator<Socket> mediator) {
this.socketMediator = mediator;
}
public Void call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
Socket socket = socketMediator.takeFromQueue();
logger.info("Received socket on port: " + socket.getLocalPort());
} catch (InterruptedException e) {
logger.info("Interrupted.");
Thread.currentThread().interrupt();
}
}
return null;
}
public void interrupt() {
Thread.currentThread().interrupt();
}
}
我現在可以看到的第一個顯而易見的事情是,你在'runnable'字段旁邊缺少'volatile'。由於您嘗試中斷另一個線程的線程,因此您需要確保字段更改的可見性。我也會把它變成普通的'boolean',而不是'Boolean'。 – 2014-09-01 20:13:17
也許我可以將Socket和ServerSocket封裝到它們自己的類中,這將允許我創建它們的獨特毒藥實例嗎? – CodeShaman 2014-09-01 20:23:18
如果你調用'ServerSocket#close()',那麼阻塞的'accept()'調用會拋出一個異常......這樣做有幫助嗎? – 2014-09-01 20:30:33