問題說明:識別來自整數的無限流的兩個連續整數,其中這些整數由多個生產者生成,但單個消費者在同一個數再次重複時生成警報。同一執行器服務中的多個生產者和單個消費者
我有多個Producers
和單Consumer
。如果我將消費者提交給ExecutorService
,則消費者未啓動。但是,如果我在單獨的線程中運行Consumer,則Consumer線程將按預期啓動。
代碼:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class FixedBlockingQueue {
final BlockingQueue<Integer> queue;
private int capacity;
public FixedBlockingQueue(int capacity){
super();
this.capacity = capacity;
queue = new ArrayBlockingQueue<Integer>(capacity);
System.out.println("Capactiy:"+this.capacity);
}
public void addElement(Integer element){
try{
queue.put(element);
}catch(Exception err){
err.printStackTrace();
}
}
public void startThreads(){
ExecutorService es = Executors.newFixedThreadPool(1);
for (int i =0; i < 10; i++){
es.submit(new MyProducer(this));
}
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(this)).start();
}
public BlockingQueue<Integer> getQueue(){
return queue;
}
public static void main(String args[]){
FixedBlockingQueue f = new FixedBlockingQueue(1);
f.startThreads();
}
}
class MyProducer implements Runnable{
private FixedBlockingQueue queue;
public MyProducer(FixedBlockingQueue queue){
this.queue = queue;
}
public void run(){
for (int i=1; i< 5; i++){
queue.addElement(new Integer(i));
System.out.println("adding:"+i);
}
}
}
class MyConsumer implements Runnable{
private BlockingQueue<Integer> queue;
Integer firstNumber = 0;
private final ReentrantLock lock = new ReentrantLock();
public MyConsumer(FixedBlockingQueue fQueue){
this.queue = fQueue.getQueue();
}
/* TODO : Compare two consecutive integers in queue are same or not*/
public void run(){
Integer secondNumber = 0;
while (true){
try{
lock.lock();
System.out.println("queue size:"+queue.size());
if (queue.size() > 0) {
secondNumber = queue.remove();
System.out.println("Removed:"+secondNumber);
System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber);
if (firstNumber.intValue() == secondNumber.intValue()){
System.out.println("Numbers matched:"+firstNumber);
}
firstNumber = secondNumber;
}
Thread.sleep(1000);
}catch(Exception err){
err.printStackTrace();
}finally{
lock.unlock();
}
}
}
}
輸出:
Capactiy:1
adding:1
如果我從
es.submit(new MyConsumer(queue));
//new Thread(new MyConsumer(queue)).start();
更改代碼以
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(queue)).start();
消費者線程正常啓動。
輸出:
Capactiy:1
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:0:1
adding:2
queue size:1
Removed:2
Numbers:Num1:Num2:1:2
adding:3
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
adding:4
queue size:1
Removed:4
Numbers:Num1:Num2:3:4
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:4:1
adding:2
queue size:1
Removed:2
adding:3
Numbers:Num1:Num2:1:2
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
在第一種方法:
我知道這個編號不被消費者食用,但它不應該阻止提交的其他Producer
任務理想。
如果是這種情況,那麼使用ExecutorService
作爲簡單Threads
的替代品不可能達到100%?
我必須按到達順序插入元素,因此選擇與ReentrantLock單線程。 100多個線程產生整數,我必須按照到達順序檢查兩個連續的整數。 –
@Ravindrababu,''BlockingQueue'是線程安全的「的一部分,你會認爲額外的鎖定會貢獻任何你不會從'BlockingQueue'中自動獲得的東西? –
我稍後修復了消費者部分,它接受與Producer相同的類。 –