2011-08-08 37 views
1

我從ExecutorCompletionService有奇怪的行爲。該項目被添加到ExecutorCompletionService.submit()罰款。然後它得到處理,並由之前提交的Callable工作線程返回。之後返回ExecutorCompletionService.take()從來沒有看到它,所以從來沒有看到阻塞返回項目?我真的不確定發生了什麼事。我已經創建了打印線,並可以看到它完成了Callable工作線程。一旦發生這種情況ExecutorCompletionService.take應該準備好採取,但在某些情況下,事情鎖定,有時它的罰款?ExecutorCompletionService不接受由Callable返回的項目嗎?

我創建了一個測試情況下,如果你跑了幾次,你會看到它會在某些情況下,鎖定並從未採取任何成品線程

ThreadDeadlockDemo

import java.util.Observable; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ThreadDeadlockDemo extends Observable implements Runnable { 

private CompletionService<String> pool; 
private ExecutorService executor ; 
private Thread responseWorkerThread; 
private HttpSchedulerWorker schedulerWorker; 
private boolean shouldRun = true; 
private int numThreadsInPool; 
private BlockingQueue<String> queue; 
public ThreadDeadlockDemo(int numThreads) 
{ 
    numThreadsInPool = numThreads; 
    executor = Executors.newFixedThreadPool(numThreads); 
    pool = new ExecutorCompletionService<String>(executor); 
    schedulerWorker = new HttpSchedulerWorker(); 
    responseWorkerThread = new Thread(schedulerWorker); 
    responseWorkerThread.start(); 
    queue = new LinkedBlockingQueue<String>(); 
    new Thread(this).start(); 
} 

public ThreadDeadlockDemo() 
{ 
    numThreadsInPool = 1; 
    executor = Executors.newFixedThreadPool(1); 
    pool = new ExecutorCompletionService<String>(executor); 
    schedulerWorker = new HttpSchedulerWorker(); 
    responseWorkerThread = new Thread(schedulerWorker); 
    responseWorkerThread.start(); 
    queue = new LinkedBlockingQueue<String>(); 
    new Thread(this).start(); 
} 

public void setThreadCount(int numThreads) 
{ 
    executor = Executors.newFixedThreadPool(numThreads); 
    pool = new ExecutorCompletionService<String>(executor); 
    numThreadsInPool = numThreads; 
} 

public void add(String info) { 
    queue.add(info); 
} 

@Override 
public void run() { 
    // TODO Auto-generated method stub 
    while(shouldRun) 
    { 
     try { 
      String info = queue.take(); 
      Callable<String> worker = new WorkerThread(info); 
      System.out.println("submitting to pooler: " + info); 
      pool.submit(worker); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

/** 
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they 
* are complete it will send them to server for completion. 
* @author Steve 
* 
*/ 
class HttpSchedulerWorker implements Runnable{ 

    public void run() { 
     // TODO Auto-generated method stub 
     while(true) 
     { 
      String vulnInfo = null; 
      try { 
       Future<String> tmp = pool.take(); 
      // Future<VulnInfo> tmp = pool.poll(); 
       if(tmp != null) 
        vulnInfo = tmp.get(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 

      if(vulnInfo != null) 
      { 
       System.out.println("info was taken from pool completed: " + vulnInfo); 
      } 



     } 
    } 

} 

}

WorkerClass:這是線程工作者添加到執行程序池並返回,但在某些情況下永遠不會在ThreadlockDemos ExecutorCompletionService池中收到通知?

import java.util.concurrent.Callable; 

public class WorkerThread implements Callable<String>{ 


String info; 
WorkerThread(String info) 
{ 
    this.info = info; 
} 

//@Override 
public String call() throws Exception { 
    System.out.println("sending vuln info: " + info); 
    return info; 
} 


} 

這裏是我的測試類只是將項目添加到隊列。這是從我的控制檯打印出來的一張看起來失敗的照片。它添加到隊列中並返回值。但是take()從來沒有被稱爲任何想法爲什麼?它有時有效,有時會失敗,使我很難看出錯誤。我很想用java來說它的錯誤,但是我環顧四周並沒有看到這些類的任何問題?

public class HttpSchedulerThreadedUnitTest { 

ThreadDeadlockDemo scheduler; 
public HttpSchedulerThreadedUnitTest(){ 

    setupScheduler(); 
    for(int i=0; i < 5;i++) 
    { 
     scheduler.add(i+""); 
    } 
} 

private void setupScheduler() 
{ 
    scheduler = new ThreadDeadlockDemo(); 
    scheduler.setThreadCount(1); 
} 

public static void main(String[] args) 
{ 
    new HttpSchedulerThreadedUnitTest(); 
} 

}

控制檯打印:這是運行它不會從池中取的WorkerThread完成 提交普勒時:0 提交給波爾:1 提交給波爾:2 發送vuln信息:0 提交池程序:3 發送vuln信息:1 提交池程序:4 發送vuln信息:2 發送vuln信息:3 發送vuln信息:4

控制檯打印:它實際上正在從游泳池返回itunes! 提交普勒:0 提交普勒:1 提交普勒:2 提交普勒:3 提交普勒:4 發送vuln信息:0 信息是從庫中取出完成:0 發送vuln信息: 1個 信息是從庫中取出已完成:1 發送vuln信息:2 信息是從庫中取出完成:2 發送vuln信息:3 信息是從庫中取出完成:3 發送vuln信息:4 信息取從池中完成:4

回答

0

這是很多代碼。如果你可以減少它(通過刪除http相關部分等),這將是很好的。我也不確定你的意思After that return the ExecutorCompletionService.take never sees it so never sees the blocking to return anymore items?

你可以採取一個線程轉儲,當它鎖定,看看哪個線程被鎖定在代碼的哪一點。

同時,我看到一些看起來不對的代碼。

while(requestQueue.isEmpty()){ 
      try { 
       synchronized(this) 
       { 
        wait(); 
       } 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      } 

在這裏,您正在同步一個可運行的對象。這幾乎總是錯誤的,因爲可運行對象通常不能被多個線程訪問。您還正在測試synchronized語句之外的條件。通常你使用等待如下:

synchronized(lock){ 
    while(!condition){ 
     wait(); 
    } 
} 

但是,我沒有看到任何代碼調用可運行的通知。這可能會導致程序掛起。基本上你在等待什麼,但沒有人叫醒你,所以你無限期地等待。無論這是您面臨的問題的原因,可以通過在發生這種情況時查看線程轉儲來輕鬆確定。

如果您使用的是隊列,這裏最好的建議是對請求隊列使用阻塞隊列。這樣你就不必這樣做等待/完全通知企業。

+0

我清理了代碼並使其可以運行以測試沒有依賴關係。我有它添加並返回一個字符串。我按照你所說的做了,並添加了一個阻塞隊列。這有助於清理代碼感謝,但仍然有相同的死鎖問題。任何想法可能會在這段代碼中陷入僵局?看起來很簡單,我沒有看到任何不正確的實施? – steve

+0

我忘了提及我做了一個線程轉儲,但無法從提供的數據中確定任何東西。也許其他人可以理解它: – steve

+0

線程轉儲:ame:pool-2-thread-1 狀態:WAITING上java.u[email protected]5fcf29 總阻塞:1等待總數:1 堆棧跟蹤: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(Unknown Source) java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject.await(Unknown Source) java.util.concurrent.LinkedBlockingQueue.take(來源不明) java.util.concurrent.ThreadPoolExecutor.getTask(來源不明) java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(來源不明) 的java.lang.Thread .RU n(未知源) – steve