2014-02-14 42 views
0

我將可加密提交給ExecutorCompletionService,並且好像submit()方法在提交可加密時不會阻止代碼。下面是我的代碼:CompletionService - submit()不會阻止以確保創建所有線程

ExecutorService executor = Executors.newFixedThreadPool(30); 
    BlockingQueue<Future<Data>> completionQueue = new LinkedBlockingQueue(); 
    ExecutorCompletionService<Data> completionService = new ExecutorCompletionService<Data>(executor, completionQueue); 

    while(receivingPackets) { 
     Callable<Data> splitPacketCallable = new SplitPacket(packetString); 
     completionService.submit(splitPacketCallable); 
     try { 
      // Allow submit to finish 
      TimeUnit.MILLISECONDS.sleep(50); 
     } catch (InterruptedException ex) { 
      System.out.println("Something went wrong with sleeping"); 
     } 

     try { 
      Future<Data> dataFuture = completionService.poll(); 
      if (dataFuture != null) { 
       Data data = dataFuture.get(); 
       fileWriter.writeLine(data.toString()); 
      } 

     } catch (InterruptedException ex) { 
      System.out.println("Error from poll: " + ex.toString()); 
     } catch (ExecutionException ex) { 
      System.out.println("Error from get: " + ex.toString()); 
     } 
    } 

    // Finish any remaining threads 
    while (!completionQueue.isEmpty()) { 
     try { 
      Future<Data> dataFuture = completionService.take(); 
      Data data = dataFuture.get(); 
      fileWriter.writeLine(data.toString()); 
     } catch (InterruptedException ex) { 
      System.out.println("Error from take: " + ex.toString()); 
     } catch (ExecutionException ex) { 
      System.out.println("Error from get: " + ex.toString()); 
     } 
    } 

    fileWriter.close(); 
    executor.shutdown(); 

有幾件事情需要注意:

Data的是,在一個特殊的格式存儲數據的類。 SplitPacket是一個實現Callable的類,它接收已到達的字符串並將其拆分成塊以保存在Data中。 fileWriter及其方法writeLine是一個可以從多個線程異步寫入單個文件的Runnable類。

如果我在for循環中使用sleep,我開始在輸出文件中獲得不穩定的結果。如果我每次提交Callable時都睡50毫秒,那麼一切正常。但是,如果我提交一個較低的值(比如說0-5毫秒),我開始在輸出中丟棄線程。對我而言,這意味着ExecutorCompletionServicesubmit()方法不會塊。但是,因爲阻止提交的可調用函數似乎很重要,所以我也假設我只是在執行這個錯誤。

在我的情況下,我不知道會有多少數據包進來,所以我需要能夠連續添加可執行文件到執行程序。我已經嘗試過使用for循環而不是while循環,以便我可以發送給定數量的數據包,並查看它們是否打印在另一端,如果提交後有延遲,我只能讓它們通過。

有沒有辦法解決這個問題,而不添加hack-y延遲?

+0

這很雜亂。而不是輪詢你應該可能會採取。這應該允許您刪除睡眠位。另外:提交做**不**設計。等。 – assylias

+0

'poll()'超過'poll()'的問題是'take()'阻塞,直到線程完成。這會導致隊列丟失。 – tomsrobots

回答

0

如果您查看ExecutorCompletionService的來源,您會看到在任務標記爲完成之後,期貨將被添加到completionQueue

private class QueueingFuture extends FutureTask<Void> { 
    QueueingFuture(RunnableFuture<V> task) { 
     super(task, null); 
     this.task = task; 
    } 
    protected void done() { completionQueue.add(task); } 
    private final Future<V> task; 
} 

您可能有一個空的隊列,但仍然運行任務。 你可以做的最簡單的事情就是計算任務。

int count = 0; 
while(receivingPackets) { 
    ... 
    completionService.submit(splitPacketCallable); 
    ++count; 
    ... 
    try { 
     Future<Data> dataFuture = completionService.poll(); 
     if (dataFuture != null) { 
      --count; 
      ... 
     } 
    ... 
} 

// Finish any remaining threads 
while (count-- > 0) { 
    ...  
} 
+0

謝謝。這似乎對我有效。我希望有一個更好的方法,而不僅僅是手動計算線程,但我想這個解決方案可以。 – tomsrobots

相關問題