2017-04-13 23 views
1

我正在運行一個簡單的測試,其中我從4個線程和訂戶中向TopicProcessor發佈消息,我只是將它們添加到集合中。代碼如下:在反應堆3中丟失來自TopicProcessor的消息

@Test 
public void testProcessingMessages() throws Exception { 
    int numberOfMessages = 1000; 

    TopicProcessor<Integer> processor = TopicProcessor.create(); 

    ExecutorService executorService = Executors.newFixedThreadPool(4); 

    Queue<Integer> messages = new ConcurrentLinkedQueue<>(); 

    processor.subscribe(messages::add); 

    AtomicInteger counter = new AtomicInteger(0); 
    for (int i = 0; i < numberOfMessages; i++) { 
     executorService.submit(() -> { 
      processor.onNext(counter.incrementAndGet()); 
     }); 
    } 

    Thread.sleep(10000); 

    assertEquals(numberOfMessages, messages.size()); 
} 

但最後斷言失敗,通常在980-990實際的消息,而不是預期的1000 我缺少的東西?

回答

3

問題是TopicProcessor.create創建了一個處理器,期望從單個線程發佈。從多個線程生產時應使用TopicProcessor.share