我有一個海量數據集,我需要將其填充到數據庫中。我正在編寫一個基於Java併發庫的代碼(包含BlockingQueue和executorService的生產者 - 消費者模型),它可以在隊列到達時隨時向隊列中添加數據。消費者不斷檢索數據,除非它遇到「毒藥」(然後死亡)。從BlockingQueue提取時丟失的項目
主要類,將發佈虛擬數據。隊列長度是故意保持較小:
public class MessageProcessor {
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
5, true);
private static final ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ExecutorService consumerExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final String POISON = "THE_END";
public void processMessages() throws InterruptedException {
//Create and start consumer
Runnable consumer = new MessageConsumer(queue);
consumerExecutor.execute(consumer);
for (String payload : getPayload()) {
//create and start producer with given payload
Runnable producer = new MessageProducer(queue, payload);
executor.execute(producer);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
consumerExecutor.shutdown();
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
}
private List<String> getPayload() {
List<String> payloads = new ArrayList<>();
payloads.add("data1");
payloads.add("data2");
payloads.add("data3");
payloads.add("data4");
payloads.add("data5");
payloads.add("data6");
payloads.add("data7");
payloads.add("data8");
payloads.add("data9");
payloads.add("data10");
payloads.add(POISON);
return payloads;
}}
生產者的Runnable:
public class MessageProducer implements Runnable {
private BlockingQueue<String> queue;
private String payload;
public MessageProducer(BlockingQueue<String> queue, String payload) {
this();
this.queue = queue;
this.payload = payload;
}
private MessageProducer() {
}
public void run() {
try {
queue.put(payload);
System.out.println("Put : " + payload);
} catch (InterruptedException e) {
e.printStackTrace();
}
}}
消費者的Runnable:
public class MessageConsumer implements Runnable {
private BlockingQueue<String> queue;
private static final String POISON = "THE_END";
public MessageConsumer(BlockingQueue<String> queue) {
this();
this.queue = queue;
}
private MessageConsumer() {
}
public void run() {
String payload = "";
do {
try {
payload = queue.take();
System.out.println("Got : " + payload);
} catch (InterruptedException ie) {
// handle
break;
}
} while (!payload.equals(POISON));
}}
輸出:
Put : data1
Put : data2
Put : data3
Put : data7
Put : data6
Put : data5
Got : data1
Got : data2
Got : data3
Got : data5
Put : data10
Put : data8
Put : data9
Got : data6
Got : data7
Put : data4
Put : THE_END
Got : data8
Got : data9
Got : data10
Got : THE_END
當我執行新MessageProcessor.processMessages (),我觀察到兩種異常:
- 消費者無法取得一個項目:DATA4(我假設,因爲它獲取中毒數據(「THE_END」)「數據4」可以檢索前) - 但爲什麼它不從FIFO隊列的插入順序獲取數據?在隊列
- 插入(放)在項目的順序不會發生在列表(例如,「數據3」後,「數據7」插入)
謝謝!
謝謝 - 通過從列表中刪除毒物和直接添加毒物生產者執行者終止後的隊列。 – felixmd