2013-04-18 35 views
0

我正在嘗試編寫多線程代碼。但是我真的無法理解我從哪裏開始。我的頭也在打。請幫幫我。在使用ArrayBlockingQueue和互斥鎖的多線程代碼中的疑問

我的任務是,

  1. 有長度爲1的一個隊列,稱之爲pending_tasks,包含這需要一些處理任務。
  2. 還有另一個長度爲1的隊列,稱爲completed_tasks,它包含完成處理並準備交付的任務。

我的實踐與思考,

  1. 首先提出兩個阻塞隊列,pending_taskscompleted_tasks
  2. 一個線程(製作者)總是監聽來自外部的任務,如果被放入pending_tasks
  3. 一個線程(消費者)總是準備好從pending_tasks開始執行任務並開始處理,然後放入completed_tasks
  4. 然後再次來到pending_tasks,並且每當有任務到來時,開始相同的處理。
  5. 基本上,它是一個單一的生產者 - 單個消費者問題。

我的困惑,

我知道,它可以通過使用ArrayBlockingQueue和互斥是代碼。但我不明白我該如何開始。我對互斥體有了很好的理解,我從這link中瞭解了互斥體,並且也很好地理解了blockingQueue,因爲我在本網站上閱讀了很多問題。

您可以給我一些實現指導,以便我可以編寫這個多線程代碼。

我已經寫了一些相同的代碼,但這並沒有達到我的任務的最終目標。

在此先感謝。尋找你的迴應。

編輯編號1

請參閱我的下面的代碼。此代碼工作正常,但此代碼缺少一個功能。請幫我補充一點,給一些指導來做到這一點。

功能是,

  1. 當生產者線程放入隊列pending_task一定的價值,那麼它會等待一段時間存在。如果在那個時候消費者把結果交給消費者,那麼它就OK。否則,它說超時,並且生產者在pending_task隊列中獲取另一個值並輸入pput,並開始相同的過程。

請幫我添加上面的功能。我認爲我們必須在生產者線程和消費者線程之間進行通信,線程通信是通過使用Mutex(我認爲)來完成的。請幫我實現相同

我的代碼,

多線程類

package multithread; 

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 

public class MultiThread { 

    public static BlockingQueue<Integer> pending_task; 
    public static BlockingQueue<Integer> completed_task; 

    public MultiThread(int length) { 
     pending_task = new ArrayBlockingQueue<Integer>(length, true); 
     completed_task = new ArrayBlockingQueue<Integer>(length, true); 
    } 
} 

監製類

package multithread; 

import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Producer implements Runnable { 

    @Override 
    public void run() { 
     for (int i = 0; i < 10; i++) { 
      try { 
       System.out.println("PRODUCER: Try to put value " + i + " in the pending queue"); 
       MultiThread.pending_task.put(i); 
       System.out.println("PRODUCER: Successfully put value " + i + " in the pending queue, now its turn to consumer"); 
      } catch (InterruptedException ex) { 
       Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
    } 
} 

消費者Ç姑娘

package multithread; 

import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Consumer implements Runnable { 

    @Override 
    public void run() { 
     for (int i = 0; i < 10; i++) { 
      try { 
       System.out.println("CONSUMER: Try to take value from the pending queue"); 
       int val = MultiThread.pending_task.take(); 
       System.out.println("CONSUMER: Successfully take value, and that is " + val); 
       System.out.println("CONSUMER: Processing starts"); 
       Thread.sleep(1000); 
       System.out.println("CONSUMER: Processing ends"); 
       System.out.println("CONSUMER: Try to put that that value in completed queue, and the value is " + val); 
       MultiThread.completed_task.put(val); 
       System.out.println("CONSUMER: Successfully put into completed queue"); 

       //Serve this value to the corresponding user 
      } catch (InterruptedException ex) { 
       Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 

     } 
    } 
} 

DeliveryBoy類

package multithread; 

import java.util.logging.Level; 
import java.util.logging.Logger; 

public class DeliveryBoy implements Runnable { 

    @Override 
    public void run() { 
     for (int i = 0; i < 10; i++) { 
      try { 
       System.out.println("DELIVERYBOY: Waiting for the value near completed queue"); 
       int val = MultiThread.completed_task.take(); 
       System.out.println("DELIVERYBOY: Succesfully take value from completed queue and the vlue is " + val); 
       //Serve this value to the corresponding user 
      } catch (InterruptedException ex) { 
       Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 

     } 
    } 
} 

測試類

package multithread; 

public class Test { 

    public static void main(String[] args) { 
     // TODO code application logic here 
     MultiThread ml = new MultiThread(1); 
     new Thread(new Producer()).start(); 
     new Thread(new Consumer()).start(); 
     new Thread(new DeliveryBoy()).start(); 
    } 
} 
+3

您的問題很可能會被關閉,除非您發佈當前的代碼併爲什麼不起作用而提出問題。目前,您的問題非常模糊,並且是開放式的,只能由爲您編寫整個事情的人回答。 –

+0

你的互斥鏈接相當陳舊。雖然您仍然可以使用等待/通知消費者/生產者模式,但還有更多有用的和更高級別的解決方案。我建議你閱讀http://docs.oracle.com/javase/tutorial/essential/concurrency/。 –

+0

@DuncanJones我添加了我的代碼。請參閱 – devsda

回答

1

ArrayBlockingQueue#put

公共無效認沽(E E) 到此隊列的尾部拋出InterruptedException的

插入指定的元素,等待 **可用空間,如果隊列已滿

ArrayBlockingQueue#take

公共電子取() 拋出InterruptedException的

從接口BlockingQueue複製的描述檢索並移除此隊列的隊列 等待有需要時,直到元素變爲 可用

所以你需要做的就是從你的線程中調用這些方法。
試試這個(研究javadoc),當你有更具體的問題時,你可以再問一次。

+0

你在答案中解釋的事情已經爲我所知。我覺得我在理論上已經很好。但我無法將其轉換爲代碼。我做了一些代碼(請參閱問題的更新部分),但有很多問題,請幫我解決這些問題。是的,我想增加互斥概念。 – devsda

+0

請幫幫我。 – devsda

+1

@devsda:你的代碼在哪裏? – Cratylus