我將可加密提交給ExecutorCompletionService,並且好像submit()
方法在提交可加密時不會阻止代碼。下面是我的代碼:CompletionService - submit()不會阻止以確保創建所有線程
ExecutorService executor = Executors.newFixedThreadPool(30);
BlockingQueue<Future<Data>> completionQueue = new LinkedBlockingQueue();
ExecutorCompletionService<Data> completionService = new ExecutorCompletionService<Data>(executor, completionQueue);
while(receivingPackets) {
Callable<Data> splitPacketCallable = new SplitPacket(packetString);
completionService.submit(splitPacketCallable);
try {
// Allow submit to finish
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ex) {
System.out.println("Something went wrong with sleeping");
}
try {
Future<Data> dataFuture = completionService.poll();
if (dataFuture != null) {
Data data = dataFuture.get();
fileWriter.writeLine(data.toString());
}
} catch (InterruptedException ex) {
System.out.println("Error from poll: " + ex.toString());
} catch (ExecutionException ex) {
System.out.println("Error from get: " + ex.toString());
}
}
// Finish any remaining threads
while (!completionQueue.isEmpty()) {
try {
Future<Data> dataFuture = completionService.take();
Data data = dataFuture.get();
fileWriter.writeLine(data.toString());
} catch (InterruptedException ex) {
System.out.println("Error from take: " + ex.toString());
} catch (ExecutionException ex) {
System.out.println("Error from get: " + ex.toString());
}
}
fileWriter.close();
executor.shutdown();
有幾件事情需要注意:
Data
的是,在一個特殊的格式存儲數據的類。 SplitPacket
是一個實現Callable的類,它接收已到達的字符串並將其拆分成塊以保存在Data
中。 fileWriter
及其方法writeLine
是一個可以從多個線程異步寫入單個文件的Runnable類。
如果我在for循環中使用sleep,我開始在輸出文件中獲得不穩定的結果。如果我每次提交Callable時都睡50毫秒,那麼一切正常。但是,如果我提交一個較低的值(比如說0-5毫秒),我開始在輸出中丟棄線程。對我而言,這意味着ExecutorCompletionService
的submit()
方法不會塊。但是,因爲阻止提交的可調用函數似乎很重要,所以我也假設我只是在執行這個錯誤。
在我的情況下,我不知道會有多少數據包進來,所以我需要能夠連續添加可執行文件到執行程序。我已經嘗試過使用for循環而不是while循環,以便我可以發送給定數量的數據包,並查看它們是否打印在另一端,如果提交後有延遲,我只能讓它們通過。
有沒有辦法解決這個問題,而不添加hack-y延遲?
這很雜亂。而不是輪詢你應該可能會採取。這應該允許您刪除睡眠位。另外:提交做**不**設計。等。 – assylias
'poll()'超過'poll()'的問題是'take()'阻塞,直到線程完成。這會導致隊列丟失。 – tomsrobots