2014-03-12 84 views
0

我有一個海量數據集,我需要將其填充到數據庫中。我正在編寫一個基於Java併發庫的代碼(包含BlockingQueue和executorService的生產者 - 消費者模型),它可以在隊列到達時隨時向隊列中添加數據。消費者不斷檢索數據,除非它遇到「毒藥」(然後死亡)。從BlockingQueue提取時丟失的項目

主要類,將發佈虛擬數據。隊列長度是故意保持較小:

public class MessageProcessor { 
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
     5, true); 
private static final ExecutorService executor = Executors 
     .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 
private static final ExecutorService consumerExecutor = Executors 
     .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 
private static final String POISON = "THE_END"; 


public void processMessages() throws InterruptedException { 

//Create and start consumer 
    Runnable consumer = new MessageConsumer(queue); 
    consumerExecutor.execute(consumer); 

    for (String payload : getPayload()) { 
     //create and start producer with given payload 
     Runnable producer = new MessageProducer(queue, payload); 
     executor.execute(producer); 
    } 

    executor.shutdown(); 
    executor.awaitTermination(1, TimeUnit.MINUTES); 

    consumerExecutor.shutdown(); 
    consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); 

} 

private List<String> getPayload() { 
    List<String> payloads = new ArrayList<>(); 
    payloads.add("data1"); 
    payloads.add("data2"); 
    payloads.add("data3"); 
    payloads.add("data4"); 
    payloads.add("data5"); 
    payloads.add("data6"); 
    payloads.add("data7"); 
    payloads.add("data8"); 
    payloads.add("data9"); 
    payloads.add("data10"); 
    payloads.add(POISON); 

    return payloads; 
}} 

生產者的Runnable:

public class MessageProducer implements Runnable { 
private BlockingQueue<String> queue; 
private String payload; 

public MessageProducer(BlockingQueue<String> queue, String payload) { 
    this(); 
    this.queue = queue; 
    this.payload = payload; 
} 

private MessageProducer() { 
} 

public void run() { 
    try { 
      queue.put(payload); 
      System.out.println("Put : " + payload); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
}} 

消費者的Runnable:

public class MessageConsumer implements Runnable { 

private BlockingQueue<String> queue; 
private static final String POISON = "THE_END"; 

public MessageConsumer(BlockingQueue<String> queue) { 
    this(); 
    this.queue = queue; 
} 

private MessageConsumer() { 
} 

public void run() { 

    String payload = ""; 
    do { 
     try { 
      payload = queue.take(); 
      System.out.println("Got : " + payload); 
     } catch (InterruptedException ie) { 
      // handle 
      break; 
     } 
    } while (!payload.equals(POISON)); 
}} 

輸出:

Put : data1 
Put : data2 
Put : data3 
Put : data7 
Put : data6 
Put : data5 
Got : data1 
Got : data2 
Got : data3 
Got : data5 
Put : data10 
Put : data8 
Put : data9 
Got : data6 
Got : data7 
Put : data4 
Put : THE_END 
Got : data8 
Got : data9 
Got : data10 
Got : THE_END 

當我執行新MessageProcessor.processMessages (),我觀察到兩種異常:

  1. 消費者無法取得一個項目:DATA4(我假設,因爲它獲取中毒數據(「THE_END」)「數據4」可以檢索前) - 但爲什麼它不從FIFO隊列的插入順序獲取數據?在隊列
  2. 插入(放)在項目的順序不會發生在列表(例如,「數據3」後,「數據7」插入)

謝謝!

回答

2

你的兩個問題是一樣的。

由於您有多個生產者並行運行,您不能保證第一個生產者會在第二個生產者之前將其元素放入隊列中。所以物品在隊列中不會順序排列,並且毒物在data4之前,因此消費者不會消費。

+0

謝謝 - 通過從列表中刪除毒物和直接添加毒物生產者執行者終止後的隊列。 – felixmd

1

將條目放入隊列是非確定性的,因爲您爲每條消息使用一個唯一的可運行對象,而不是在單個線程中按順序循環。

正如你所說,這可能解釋了爲什麼有些消息沒有看到,因爲他們會在結束後。

+0

是的,。我的具體情況,我必須處理線程的父母(一個單獨的線程)中的數據 - 雖然,如你所說,這解釋了上述行爲。 – felixmd

1

你的隊列是一個fifo,是的,但你並不是按照fifo順序將數據推送到隊列中。

如果.availableProcessors()返回> 1,你有幾個生產商將數據推到隊列 - 你的遺囑執行人管理線程沒有在針對你打電話executor.execute(producer);的順序依次運行保證