我有一些服務,既從入站隊列中消耗,又產生到一些出站隊列(由此服務創建的另一個線程拾取消息並將它們「傳輸」到目的地)。如何在Java中運行輪詢阻止消費者?
目前我使用兩個普通的Thread
s,如下面的代碼所示,但我知道一般情況下不應該再使用它們,而應該使用ExecutorService
等更高級別的抽象。
這對我而言是否合理?更具體地說,我的意思是 - >
- 它會減少代碼嗎?
- 使代碼在失敗的情況下更健壯?
- 允許更平滑的線程終止? (這在運行測試時是有幫助的)
我在這裏錯過了一些重要的東西嗎?
// called on service startup
private void init() {
// prepare everything here
startInboundWorkerThread();
startOutboundTransporterWorkerThread();
}
private void startInboundWorkerThread() {
InboundWorkerThread runnable = injector.getInstance(InboundWorkerThread.class);
inboundWorkerThread = new Thread(runnable, ownServiceIdentifier);
inboundWorkerThread.start();
}
// this is the Runnable for the InboundWorkerThread
// the runnable for the transporter thread looks almost the same
@Override
public void run() {
while (true) {
InboundMessage message = null;
TransactionStatus transaction = null;
try {
try {
transaction = txManager.getTransaction(new DefaultTransactionDefinition());
} catch (Exception ex) {
// logging
break;
}
// blocking consumer
message = repository.takeOrdered(template, MESSAGE_POLL_TIMEOUT_MILLIS);
if (message != null) {
handleMessage(message);
commitTransaction(message, transaction);
} else {
commitTransaction(transaction);
}
} catch (Exception e) {
// logging
rollback(transaction);
} catch (Throwable e) {
// logging
rollback(transaction);
throw e;
}
if (Thread.interrupted()) {
// logging
break;
}
}
// logging
}
// called when service is shutdown
// both inbound worker thread and transporter worker thread must be terminated
private void interruptAndJoinWorkerThread(final Thread workerThread) {
if (workerThread != null && workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException e) {
// logging
}
}
}