2012-08-27 105 views
4

下面的java代碼示例使用java DelayQueue來處理任務。然而,從另一個線程插入任務似乎會破壞(我)預期的行爲。java.util.concurrent.DelayQueue忽略過期元素

道歉,該代碼例如是如此長,但在總結:

  1. 主線程添加5個任務(AE)與各種延遲一個DelayQueue(0毫秒,10毫秒,100毫秒1000毫秒,10000ms)
  2. 另一胎開始它增加了後3000ms
  3. 另一個任務的DelayQueue主線程輪詢DelayQueue並在每個任務報告到期
  4. 後8000MS主線程報告中剩餘的DelayQueue
  5. 任務

,我從代碼示例得到的輸出是:

------initial tasks --------------- 
task A due in 0ms 
task B due in 9ms 
task C due in 99ms 
task D due in 999ms 
task E due in 9999ms 
task F due in 99999ms 
------processing-------------------- 
time = 5 task A due in -1ms 
time = 14 task B due in 0ms 
time = 104 task C due in 0ms 
time = 1004 task D due in 0ms 
time = 3003 added task Z due in 0ms 
------remaining after 15007ms ----------- 
task F due in 84996ms 
task E due in -5003ms 
task Z due in -12004ms 

我的問題是:爲什麼後15000ms都存在過期留在DelayQueue任務(即其中GetDelay()返回一個值-ve) ?

,我查了一些事情:

  • 我已經實現的compareTo()來定義任務的自然順序
  • equals()方法是用的compareTo()
  • hashCode()方法是一貫的重寫

我會對如何解決這個問題感興趣。預先感謝您的幫助。 (和所有這些堆棧溢出的答案已經幫助我去約會了:)

package test; 

    import java.util.concurrent.DelayQueue; 
    import java.util.concurrent.Delayed; 
    import java.util.concurrent.TimeUnit; 

    public class Test10_DelayQueue { 

     private static final TimeUnit delayUnit = TimeUnit.MILLISECONDS; 
     private static final TimeUnit ripeUnit = TimeUnit.NANOSECONDS; 

     static long startTime; 

     static class Task implements Delayed {  
      public long ripe; 
      public String name;  
      public Task(String name, int delay) { 
      this.name = name; 
      ripe = System.nanoTime() + ripeUnit.convert(delay, delayUnit); 
      } 

     @Override 
     public boolean equals(Object obj) { 
     if (obj instanceof Task) { 
      return compareTo((Task) obj) == 0; 
     } 
     return false; 
     } 

     @Override 
     public int hashCode() { 
     int hash = 7; 
     hash = 67 * hash + (int) (this.ripe^(this.ripe >>> 32)); 
     hash = 67 * hash + (this.name != null ? this.name.hashCode() : 0); 
     return hash; 
     } 

     @Override 
     public int compareTo(Delayed delayed) { 
     if (delayed instanceof Task) { 
      Task that = (Task) delayed; 
      return (int) (this.ripe - that.ripe); 
     } 
     throw new UnsupportedOperationException(); 
     } 

     @Override 
     public long getDelay(TimeUnit unit) { 
     return unit.convert(ripe - System.nanoTime(), ripeUnit); 
     } 

     @Override 
     public String toString() { 
     return "task " + name + " due in " + String.valueOf(getDelay(delayUnit) + "ms"); 
      } 
     } 

     static class TaskAdder implements Runnable { 

     DelayQueue dq; 
     int delay; 

     public TaskAdder(DelayQueue dq, int delay) { 
     this.dq = dq; 
     this.delay = delay; 
     } 

     @Override 
     public void run() { 
     try { 
      Thread.sleep(delay); 

      Task z = new Task("Z", 0); 
      dq.add(z); 

      Long elapsed = System.currentTimeMillis() - startTime; 

      System.out.println("time = " + elapsed + "\tadded " + z); 

     } catch (InterruptedException e) { 
     } 
     } 
    } 

    public static void main(String[] args) { 
     startTime = System.currentTimeMillis(); 
     DelayQueue<Task> taskQ = new DelayQueue<Task>(); 

     Thread thread = new Thread(new TaskAdder(taskQ, 3000)); 
     thread.start(); 

     taskQ.add(new Task("A", 0)); 
     taskQ.add(new Task("B", 10)); 
     taskQ.add(new Task("C", 100)); 
     taskQ.add(new Task("D", 1000)); 
     taskQ.add(new Task("E", 10000)); 
     taskQ.add(new Task("F", 100000)); 

     System.out.println("------initial tasks ---------------"); 
     Task[] tasks = taskQ.toArray(new Task[0]); 
     for (int i = 0; i < tasks.length; i++) { 
     System.out.println(tasks[i]); 
     } 

     System.out.println("------processing--------------------"); 
     try { 
     Long elapsed = System.currentTimeMillis() - startTime; 
     while (elapsed < 15000) { 
      Task task = taskQ.poll(1, TimeUnit.SECONDS); 
      elapsed = System.currentTimeMillis() - startTime; 
      if (task != null) { 
       System.out.println("time = " + elapsed + "\t" + task); 
      } 
     } 

     System.out.println("------remaining after " + elapsed + "ms -----------"); 
     tasks = taskQ.toArray(new Task[0]); 
     for (int i = 0; i < tasks.length; i++) { 
      System.out.println(tasks[i]); 
     } 

     } catch (InterruptedException e) { 
     } 
    } 
    } 

回答

3

因爲你comapareTo方法是漏洞百出。正確的實施如下。一旦你改變如下所有問題將得到解決。總是試圖重用compareTo方法,如果還是堅持compareTo合同

return Long.valueOf(this.ripe).compareTo(that.ripe); 
+0

非常感謝 - 您的答案解決了我的問題。但我不明白爲什麼。 API聲明CompareTo「返回一個負整數,零或正整數,因爲該對象小於,等於或大於指定對象」。我理解重新使用Long.compareTo()的智慧,但我不明白爲什麼我的代碼不符合compareTo契約!? – nhoj

+2

原因是數值溢出。你正在將一個「長」的差異以納秒爲單位轉換爲「int」,但是在int中不能持有超過2.2秒的納秒,並且會產生一個溢出 - 給出或多或少的隨機結果,所以隊列中的訂單可能*後面*一個有一個以後到期。 poll()不會超出隊列中的下一個項目,其順序是在項目放入隊列時定義的。 – Bohemian

+1

溢出。將'long'投射到'int'可以改變它的符號。只有大約20億個積極的'int',大約2秒的納秒。你很可能有'this.ripe> that.ripe'但是'(int)(this.ripe - that.ripe)'爲負的值。 –

5

的原因是由於數字溢出。

compareTo()方法鑄造在納秒long差異int,但超過22秒鐘的納秒不能在int舉行,你會得到一個溢出 - 讓更多或更少的隨機結果,所以如果將來隊列中的訂單超過2.2秒,則隊列中的訂單可能會落後於某個隊列。

poll()不會超出隊列中的下一個項目,當項目放入隊列時,其順序由compareTo方法定義。


此外,equals()應與hashCode()同意,以及compareTo()。有關詳細信息,請參閱javadoc for hashCode()

1

除非這是實施事件調度程序的練習,否則最好使用ScheduledExecutorService。它會做你想做的所有事情,以及更多。

+0

謝謝,是的,我知道使用ScheduledExecutorService會更好。謝謝你指出。 標準實現[ScheduledThreadPoolExecutor](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html)缺少我需要的一小塊;能夠定期爲固定數量的實例制定任務。易於擴展。謝謝 – nhoj