2016-06-13 30 views
2

問題說明:識別來自整數的無限流的兩個連續整數,其中這些整數由多個生產者生成,但單個消費者在同一個數再次重複時生成警報。同一執行器服務中的多個生產者和單個消費者

我有多個Producers和單Consumer。如果我將消費者提交給ExecutorService,則消費者未啓動。但是,如果我在單獨的線程中運行Consumer,則Consumer線程將按預期啓動。

代碼:

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.locks.ReentrantLock; 
import java.util.Iterator; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ExecutorService; 

public class FixedBlockingQueue { 
    final BlockingQueue<Integer> queue; 
    private int capacity; 

    public FixedBlockingQueue(int capacity){ 
     super(); 
     this.capacity = capacity; 
     queue = new ArrayBlockingQueue<Integer>(capacity); 
     System.out.println("Capactiy:"+this.capacity); 
    } 
    public void addElement(Integer element){ 
     try{ 
      queue.put(element); 
     }catch(Exception err){ 
      err.printStackTrace(); 
     } 
    } 
    public void startThreads(){ 
     ExecutorService es = Executors.newFixedThreadPool(1); 
     for (int i =0; i < 10; i++){ 
      es.submit(new MyProducer(this)); 
     } 
     //es.submit(new MyConsumer(queue)); 
     new Thread(new MyConsumer(this)).start(); 
    } 
    public BlockingQueue<Integer> getQueue(){ 
     return queue; 
    } 
    public static void main(String args[]){ 
     FixedBlockingQueue f = new FixedBlockingQueue(1); 
     f.startThreads(); 
    } 
} 

class MyProducer implements Runnable{ 

    private FixedBlockingQueue queue; 
    public MyProducer(FixedBlockingQueue queue){ 
     this.queue = queue;  
    } 
    public void run(){ 
     for (int i=1; i< 5; i++){ 
      queue.addElement(new Integer(i)); 
      System.out.println("adding:"+i); 
     } 
    } 
} 

class MyConsumer implements Runnable{ 
    private BlockingQueue<Integer> queue; 
    Integer firstNumber = 0; 
    private final ReentrantLock lock = new ReentrantLock(); 

    public MyConsumer(FixedBlockingQueue fQueue){ 
     this.queue = fQueue.getQueue(); 
    } 
    /* TODO : Compare two consecutive integers in queue are same or not*/ 
    public void run(){ 
     Integer secondNumber = 0; 
     while (true){ 
      try{ 
       lock.lock(); 
       System.out.println("queue size:"+queue.size()); 
       if (queue.size() > 0) { 
        secondNumber = queue.remove(); 
        System.out.println("Removed:"+secondNumber); 
        System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber); 
        if (firstNumber.intValue() == secondNumber.intValue()){ 
         System.out.println("Numbers matched:"+firstNumber); 
        } 
        firstNumber = secondNumber; 
       } 
       Thread.sleep(1000); 
      }catch(Exception err){ 
       err.printStackTrace(); 
      }finally{ 
       lock.unlock(); 
      } 
     } 
    } 
} 

輸出:

Capactiy:1 
adding:1 

如果我從

es.submit(new MyConsumer(queue)); 
//new Thread(new MyConsumer(queue)).start(); 

更改代碼以

//es.submit(new MyConsumer(queue)); 
new Thread(new MyConsumer(queue)).start(); 

消費者線程正常啓動。

輸出:

Capactiy:1 
adding:1 
queue size:1 
Removed:1 
Numbers:Num1:Num2:0:1 
adding:2 
queue size:1 
Removed:2 
Numbers:Num1:Num2:1:2 
adding:3 
queue size:1 
Removed:3 
Numbers:Num1:Num2:2:3 
adding:4 
queue size:1 
Removed:4 
Numbers:Num1:Num2:3:4 
adding:1 
queue size:1 
Removed:1 
Numbers:Num1:Num2:4:1 
adding:2 
queue size:1 
Removed:2 
adding:3 
Numbers:Num1:Num2:1:2 
queue size:1 
Removed:3 
Numbers:Num1:Num2:2:3 

在第一種方法:

我知道這個編號不被消費者食用,但它不應該阻止提交的其他Producer任務理想。

如果是這種情況,那麼使用ExecutorService作爲簡單Threads的替代品不可能達到100%?

回答

2

您使用單個線程創建線程池,並使用固定容量1創建一個BlockingQueue。然後向池中提交三個任務:前兩個每個嘗試將每個值排入五個值,然後將任何值可用。

因爲你的固定大小的池只有一個線程,您提交給它的任務將按順序運行,而不是並行。您首先提交生產者任務,因此它首先運行。但是一旦它排入第一個數字,它就無法取得進一步的進展,因爲隊列已滿。並且隊列仍會永久保留,因爲生產者任務必須在混合線程可用於其他任務(如消費者)之前完成。

我不確定你爲什麼使用線程池,因爲直接進行線程管理並不困難,特別是因爲你的任務已經實現Runnable。如果您使用游泳池,但是,請確保它有足夠的線程在它能夠同時容納所有的任務。

還要注意,BlockingQueue實現應該是線程安全的,標準庫提供的所有實際上都是如此。因此,您不需要在addElement()中執行自己的鎖定。此外,如果您確實需要執行自己的鎖定,那麼您不僅需要在排隊元素時執行鎖定,還需要執行鎖定時將其排除。

此外,您有生產者任務通過FixedBlockingQueue實例間接地將元素添加到底層隊列是非常奇怪的,但您將消費者任務直接轉到底層隊列。

而你的FixedBlockingQueue類的名稱選擇不好,因爲它暗示該類實現BlockingQueue,但該類實際上並不這樣做。

+0

我必須按到達順序插入元素,因此選擇與ReentrantLock單線程。 100多個線程產生整數,我必須按照到達順序檢查兩個連續的整數。 –

+0

@Ravindrababu,''BlockingQueue'是線程安全的「的一部分,你會認爲額外的鎖定會貢獻任何你不會從'BlockingQueue'中自動獲得的東西? –

+0

我稍後修復了消費者部分,它接受與Producer相同的類。 –

相關問題