2011-09-12 43 views
6

我需要一個字節發生器,可以生成Byte.MIN_VALUE到Byte.MAX_VALUE的值。當它達到MAX_VALUE時,它應該從MIN_VALUE重新開始。這個線程安全的字節序列發生器有什麼問題?

我已經使用AtomicInteger編寫了代碼(見下文);但是,如果同時訪問並且如果使用Thread.sleep()進行人爲緩慢(如果沒有睡眠,它運行正常;但是,我認爲它對於併發問題顯示太快),代碼看起來似乎不能正確運行。

代碼(增加了一些調試代碼):

public class ByteGenerator { 

    private static final int INITIAL_VALUE = Byte.MIN_VALUE-1; 

    private AtomicInteger counter = new AtomicInteger(INITIAL_VALUE); 
    private AtomicInteger resetCounter = new AtomicInteger(0); 

    private boolean isSlow = false; 
    private long startTime; 

    public byte nextValue() { 
     int next = counter.incrementAndGet(); 
     //if (isSlow) slowDown(5); 
     if (next > Byte.MAX_VALUE) { 
      synchronized(counter) { 
       int i = counter.get(); 
       //if value is still larger than max byte value, we reset it 
       if (i > Byte.MAX_VALUE) { 
        counter.set(INITIAL_VALUE); 
        resetCounter.incrementAndGet(); 
        if (isSlow) slowDownAndLog(10, "resetting"); 
       } else { 
        if (isSlow) slowDownAndLog(1, "missed"); 
       } 
       next = counter.incrementAndGet(); 
      } 
     } 
     return (byte) next; 
    } 

    private void slowDown(long millis) { 
     try { 
      Thread.sleep(millis); 
     } catch (InterruptedException e) { 
     } 
    } 
    private void slowDownAndLog(long millis, String msg) { 
     slowDown(millis); 
     System.out.println(resetCounter + " " 
          + (System.currentTimeMillis()-startTime) + " " 
          + Thread.currentThread().getName() + ": " + msg); 
    } 

    public void setSlow(boolean isSlow) { 
     this.isSlow = isSlow; 
    } 
    public void setStartTime(long startTime) { 
     this.startTime = startTime; 
    } 

} 

而且,測試:

public class ByteGeneratorTest { 

    @Test 
    public void testGenerate() throws Exception { 
     ByteGenerator g = new ByteGenerator(); 
     for (int n = 0; n < 10; n++) { 
      for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { 
       assertEquals(i, g.nextValue()); 
      } 
     } 
    } 

    @Test 
    public void testGenerateMultiThreaded() throws Exception { 
     final ByteGenerator g = new ByteGenerator(); 
     g.setSlow(true); 
     final AtomicInteger[] counters = new AtomicInteger[Byte.MAX_VALUE-Byte.MIN_VALUE+1]; 
     for (int i = 0; i < counters.length; i++) { 
      counters[i] = new AtomicInteger(0); 
     } 
     Thread[] threads = new Thread[100]; 
     final CountDownLatch latch = new CountDownLatch(threads.length); 
     for (int i = 0; i < threads.length; i++) { 
      threads[i] = new Thread(new Runnable() { 
       public void run() { 
        try { 
         for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { 
          byte value = g.nextValue(); 
          counters[value-Byte.MIN_VALUE].incrementAndGet(); 
         } 
        } finally { 
         latch.countDown(); 
        } 
       } 
      }, "generator-client-" + i); 
      threads[i].setDaemon(true); 
     } 
     g.setStartTime(System.currentTimeMillis()); 
     for (int i = 0; i < threads.length; i++) { 
      threads[i].start(); 
     } 
     latch.await(); 
     for (int i = 0; i < counters.length; i++) { 
      System.out.println("value #" + (i+Byte.MIN_VALUE) + ": " + counters[i].get()); 
     } 
     //print out the number of hits for each value 
     for (int i = 0; i < counters.length; i++) { 
      assertEquals("value #" + (i+Byte.MIN_VALUE), threads.length, counters[i].get()); 
     } 
    } 

} 

我的2核的機器上的結果是值#-128得到146次點擊(因爲我們有100個線程,它們都應該得到100次命中)。

如果有人有任何想法,這個代碼有什麼問題,我都是耳朵/眼睛。

更新:對於那些誰在趕時間,不想向下滾動,正確的(和最短和最優雅的)的方式來解決這個問題在Java中會是這樣的:

public byte nextValue() { 
    return (byte) counter.incrementAndGet(); 
} 

謝謝,亨氏!

回答

5

您根據counter.get()的舊值決定incrementAndGet()。在對計數器執行incrementAndGet()操作之前,計數器的值可以再次達到MAX_VALUE。

if (next > Byte.MAX_VALUE) { 
    synchronized(counter) { 
     int i = counter.get(); //here You make sure the the counter is not over the MAX_VALUE 
     if (i > Byte.MAX_VALUE) { 
      counter.set(INITIAL_VALUE); 
      resetCounter.incrementAndGet(); 
      if (isSlow) slowDownAndLog(10, "resetting"); 
     } else { 
      if (isSlow) slowDownAndLog(1, "missed"); //the counter can reach MAX_VALUE again if you wait here long enough 
     } 
     next = counter.incrementAndGet(); //here you increment on return the counter that can reach >MAX_VALUE in the meantime 
    } 
} 

爲了使其工作必須確保沒有決定過時的信息。重置計數器或返回舊值。

public byte nextValue() { 
    int next = counter.incrementAndGet(); 

    if (next > Byte.MAX_VALUE) { 
     synchronized(counter) { 
      next = counter.incrementAndGet(); 
      //if value is still larger than max byte value, we reset it 
      if (next > Byte.MAX_VALUE) { 
       counter.set(INITIAL_VALUE + 1); 
       next = INITIAL_VALUE + 1; 
       resetCounter.incrementAndGet(); 
       if (isSlow) slowDownAndLog(10, "resetting"); 
      } else { 
       if (isSlow) slowDownAndLog(1, "missed"); 
      } 
     } 
    } 

    return (byte) next; 
} 
+0

爲什麼在同步塊中有counter.incrementAndGet(),就像進入該部分的所有線程都有多個增量,後來它被重置爲「INITIAL_VALUE + 1」 –

3

您的同步塊只包含if正文。它應該包裝整個方法,包括if聲明本身。或者讓您的方法nextValue同步。順便說一句,在這種情況下,你根本不需要原子變量。

我希望這會對你有用。只有在確實需要最高性能代碼時才嘗試使用Atomic變量,即​​語句會讓您感到困擾。恕我直言,在大多數情況下,它不。

+0

你的回答並沒有解釋爲什麼目前的做法是行不通的。我的理解是它應該工作:當它不同步時使用原子變量,並在執行非原子操作時同步 - 爲什麼它不起作用? –

2

如果我理解正確,那麼nextValue的結果在Byte.MIN_VALUE和Byte.MAX_VALUE範圍內,並且您不關心存儲在計數器中的值。 然後你就可以在地圖字節整數,你需要枚舉的行爲暴露:

private static final int VALUE_RANGE = Byte.MAX_VALUE - Byte.MIN_VALUE + 1; 
private final AtomicInteger counter = new AtomicInteger(0); 

public byte nextValue() { 
    return (byte) (counter.incrementAndGet() % VALUE_RANGE + Byte.MIN_VALUE - 1); 
} 

當心,這是未經測試的代碼。但這個想法應該起作用。

+0

雖然這是一個非常整潔的想法(我怎麼沒有想到它--P),它實際上工作 - 它仍然不能解釋爲什麼我原來的解決方案不起作用。 –

+0

那麼,在你的原始解決方案中有以下缺陷。第一個「incrementAndGet()」和「synchronized」塊之間的上下文切換可能會發生。即兩個線程獲得超出Byte.MAX_VALUE的「next」值。其中一個線程會重置計數器,另一個會返回太大的值。 – jmg

+0

不,如果兩者的值都太大,第一個會重置計數器,下一個會從(已經重置)計數器中獲得一個新值。 –

1

我使用compareAndSet編碼了以下版本的nextValue,它被設計用於非同步塊。它通過了你的單元測試:

哦,我爲MIN_VALUE和MAX_VALUE引入了新的常量,但如果你願意,你可以忽略它們。

static final int LOWEST_VALUE = Byte.MIN_VALUE; 
static final int HIGHEST_VALUE = Byte.MAX_VALUE; 

private AtomicInteger counter = new AtomicInteger(LOWEST_VALUE - 1); 
private AtomicInteger resetCounter = new AtomicInteger(0); 

public byte nextValue() { 
    int oldValue; 
    int newValue; 

    do { 
     oldValue = counter.get(); 
     if (oldValue >= HIGHEST_VALUE) { 
      newValue = LOWEST_VALUE; 
      resetCounter.incrementAndGet(); 
      if (isSlow) slowDownAndLog(10, "resetting"); 
     } else { 
      newValue = oldValue + 1;  
      if (isSlow) slowDownAndLog(1, "missed"); 
     } 
    } while (!counter.compareAndSet(oldValue, newValue)); 
    return (byte) newValue; 
} 

compareAndSet()作品連同get()併發性的管理。

在臨界區域開始時,您執行get()以檢索舊值。然後,您執行一些僅依賴舊值的函數來計算新值。然後您使用compareAndSet()來設置新值。如果在執行compareAndSet()(由於併發活動)時AtomicInteger不再等於舊值,則它失敗,您必須重新開始。

如果您擁有極高的併發性並且計算時間很長,可以想象compareAndSet()在成功之前可能會失敗很多次,如果您擔心,可能值得收集統計數據。

我並不是說這是一個比其他人建議的簡單同步塊更好或更差的方法,但我個人可能會使用同步塊來簡化。

編輯:我會回答你的實際問題「爲什麼不工作?」

您的代碼有:

int next = counter.incrementAndGet(); 
    if (next > Byte.MAX_VALUE) { 

由於這兩條線沒有被同步塊的保護,多個線程可同時執行它們和所有獲得的next>Byte.MAX_VALUE值。然後他們全部進入同步塊,並將counter設置回INITIAL_VALUE(一個接一個地等待對方)。

多年來,在試圖通過不同步的方式獲得性能調整的缺陷方面,寫了大量的文章。例如,請參閱Double Checked Locking

+0

是的,這是錯誤的,但我也認爲你的解釋是不正確的。第二個線程不會再次將計數器復位,因爲在同步塊中有一個'int i = counter.get();'。 –

+0

雖然這看起來像雙重檢查鎖定,但它不應該遭受同樣的問題,因爲AtomicInteger應該保證所有線程都看到相同的計數器值(無緩存)。還是有更多的事情呢? –

8

最初,Java將所有字段存儲爲4或8個字節值,甚至是短和字節。對這些字段的操作只需要進行位掩碼來縮小字節。因此,我們可以很容易地做到這一點:

public byte nextValue() { 
    return (byte) counter.incrementAndGet(); 
} 

有趣的小拼圖,感謝Neeme :-)