1
我正在運行一個簡單的測試,其中我從4個線程和訂戶中向TopicProcessor發佈消息,我只是將它們添加到集合中。代碼如下:在反應堆3中丟失來自TopicProcessor的消息
@Test
public void testProcessingMessages() throws Exception {
int numberOfMessages = 1000;
TopicProcessor<Integer> processor = TopicProcessor.create();
ExecutorService executorService = Executors.newFixedThreadPool(4);
Queue<Integer> messages = new ConcurrentLinkedQueue<>();
processor.subscribe(messages::add);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < numberOfMessages; i++) {
executorService.submit(() -> {
processor.onNext(counter.incrementAndGet());
});
}
Thread.sleep(10000);
assertEquals(numberOfMessages, messages.size());
}
但最後斷言失敗,通常在980-990實際的消息,而不是預期的1000 我缺少的東西?