我正在嘗試編寫多線程代碼。但是我真的無法理解我從哪裏開始。我的頭也在打。請幫幫我。在使用ArrayBlockingQueue和互斥鎖的多線程代碼中的疑問
我的任務是,
- 有長度爲1的一個隊列,稱之爲
pending_tasks
,包含這需要一些處理任務。 - 還有另一個長度爲1的隊列,稱爲
completed_tasks
,它包含完成處理並準備交付的任務。
我的實踐與思考,
- 首先提出兩個阻塞隊列,
pending_tasks
和completed_tasks
。 - 一個線程(製作者)總是監聽來自外部的任務,如果被放入
pending_tasks
。 - 一個線程(消費者)總是準備好從
pending_tasks
開始執行任務並開始處理,然後放入completed_tasks
。 - 然後再次來到
pending_tasks
,並且每當有任務到來時,開始相同的處理。 - 基本上,它是一個單一的生產者 - 單個消費者問題。
我的困惑,
我知道,它可以通過使用ArrayBlockingQueue和互斥是代碼。但我不明白我該如何開始。我對互斥體有了很好的理解,我從這link中瞭解了互斥體,並且也很好地理解了blockingQueue,因爲我在本網站上閱讀了很多問題。
您可以給我一些實現指導,以便我可以編寫這個多線程代碼。
我已經寫了一些相同的代碼,但這並沒有達到我的任務的最終目標。
在此先感謝。尋找你的迴應。
編輯編號1
請參閱我的下面的代碼。此代碼工作正常,但此代碼缺少一個功能。請幫我補充一點,給一些指導來做到這一點。
功能是,
- 當生產者線程放入隊列pending_task一定的價值,那麼它會等待一段時間存在。如果在那個時候消費者把結果交給消費者,那麼它就OK。否則,它說超時,並且生產者在pending_task隊列中獲取另一個值並輸入pput,並開始相同的過程。
請幫我添加上面的功能。我認爲我們必須在生產者線程和消費者線程之間進行通信,線程通信是通過使用Mutex(我認爲)來完成的。請幫我實現相同
我的代碼,
多線程類
package multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MultiThread {
public static BlockingQueue<Integer> pending_task;
public static BlockingQueue<Integer> completed_task;
public MultiThread(int length) {
pending_task = new ArrayBlockingQueue<Integer>(length, true);
completed_task = new ArrayBlockingQueue<Integer>(length, true);
}
}
監製類
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("PRODUCER: Try to put value " + i + " in the pending queue");
MultiThread.pending_task.put(i);
System.out.println("PRODUCER: Successfully put value " + i + " in the pending queue, now its turn to consumer");
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
消費者Ç姑娘
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("CONSUMER: Try to take value from the pending queue");
int val = MultiThread.pending_task.take();
System.out.println("CONSUMER: Successfully take value, and that is " + val);
System.out.println("CONSUMER: Processing starts");
Thread.sleep(1000);
System.out.println("CONSUMER: Processing ends");
System.out.println("CONSUMER: Try to put that that value in completed queue, and the value is " + val);
MultiThread.completed_task.put(val);
System.out.println("CONSUMER: Successfully put into completed queue");
//Serve this value to the corresponding user
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
DeliveryBoy類
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DeliveryBoy implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
int val = MultiThread.completed_task.take();
System.out.println("DELIVERYBOY: Succesfully take value from completed queue and the vlue is " + val);
//Serve this value to the corresponding user
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
測試類
package multithread;
public class Test {
public static void main(String[] args) {
// TODO code application logic here
MultiThread ml = new MultiThread(1);
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new DeliveryBoy()).start();
}
}
您的問題很可能會被關閉,除非您發佈當前的代碼併爲什麼不起作用而提出問題。目前,您的問題非常模糊,並且是開放式的,只能由爲您編寫整個事情的人回答。 –
你的互斥鏈接相當陳舊。雖然您仍然可以使用等待/通知消費者/生產者模式,但還有更多有用的和更高級別的解決方案。我建議你閱讀http://docs.oracle.com/javase/tutorial/essential/concurrency/。 –
@DuncanJones我添加了我的代碼。請參閱 – devsda