2012-01-31 112 views
2

假設你需要處理2個線程,一個ReaderProcessor工作。Java的多線程,讓線程並行

Reader將讀取流數據的一部分,並將其傳遞給Processor,這將做一些事情。

的想法是不是太多數據的強調讀者。

在設立,我

// Processor will pick up data from pipeIn and will place the output in pipeOut 
Thread p = new Thread(new Processor(pipeIn, pipeOut)); 
p.start(); 

// Reader will pick a bunch of bits from the InputStream and place it to pipeIn 
Thread r = new Thread(new Reader(inputStream, pipeIn)); 
r.start(); 

不用說,在初始化時沒有管爲空。

我想......當Processor已啓動它試圖從pipeIn閱讀,在下面的循環:

while (readingShouldContinue) { 
     Thread.sleep(1); // To avoid tight loop 

     byte[] justRead = readFrom.getDataCurrentlyInQueue(); 
     writeDataToPipe(processData(justRead)); 
    } 

如果沒有數據來寫,會寫什麼,應該是沒問題。

Reader開始活躍,並從流拿起一些數據:

while ((in.read(buffer)) != -1) { 
     // Writes to what processor considers a pipeIn 
     writeTo.addDataToQueue(buffer); 
    } 

管道本身,我同步訪問數據。

public byte[] getDataCurrentlyInQueue() { 

    synchronized (q) { 
     byte[] a = q.peek(); 
     q.clear(); 
     return a; 
    } 
} 

我期望2個線程並行半運行,ReaderProcessor之間交換活動。然而,什麼情況是

  • 閱讀器讀取所有塊前面
  • 處理器把一切都爲1單塊

我在想什麼嗎?

+0

InputStream.read(byte [])是一個阻止呼叫 – 2012-01-31 02:47:30

+0

@guido嗨guido。有什麼可以推薦我嗎? – JAM 2012-01-31 02:50:29

+0

試着看着java。nio SocketChannel和ByteBuffer類:AH和管道類) – 2012-01-31 02:52:33

回答

1

您重新創建了部分java併發庫。如果你使用BlockingQueue來模擬線程而不是自己同步事物,它會使事情變得更容易。

基本上你的生產者會把大塊放在BlockingQueue上,你的消費者會在隊列中循環並調用get()。這樣生產者會阻止/等待,直到隊列中有新的塊。

+0

非常酷;謝謝 – JAM 2012-02-01 03:51:54

2

我在想什麼?

(首先,我要指出,你忽略了所需要的特定事實爲依據的答案代碼和其它信息的一些關鍵位。)

我能想到的一些的可能的解釋:

  • 在您的應用程序中可能只是一個錯誤。沒有很多猜測這個錯誤的可能性,但是如果你向我們展示了更多的代碼......

  • OS線程調度程序將傾向於讓活動線程繼續運行,直到它阻塞。如果你的處理器只有一個內核(或者如果操作系統只允許你的應用程序使用一個內核),那麼第二個線程可能會餓死......足夠第一個線程完成。

  • 即使您有多個內核,OS線程調度程序可能分配額外內核的速度也會很慢,尤其是在第二個線程啓動並立即阻塞的情況下。

  • 在緩衝中可能會導致工作不在隊列中出現,這可能會導致一些「粒度」效應。 (你可以認爲這是一個錯誤......或作爲調整的問題。)

  • 這可能僅僅是你沒有給多線程在踢應用足夠的負荷。

最後,我也找不到Thread.sleep的東西。正確編寫的多線程應用程序不會使用Thread.sleep來處理任何事情,除了長期延遲;例如在後臺執行定期維護任務的線程。如果您使用sleep而不是阻塞,那麼1)您可能會使應用程序不響應,並且2)您可能會鼓勵OS線程調度程序爲線程提供更少的時間片。這很可能是因爲線程匱乏導致您遇到問題的根源。

0

讀者正在讀取第一個時間片之前的所有內容。這意味着在處理器有機會運行之前,讀數已經完成。

嘗試增加正在讀取的字節數量,或以某種方式減慢讀取器的速度;可能每隔一段時間都會有一次sleep()調用。

Btw。不要投票。這是一個可怕的CPU週期浪費,它根本不能擴展。 也使用同步隊列並忘記手動鎖定。 http://docs.oracle.com/javase/tutorial/collections/implementations/queue.html

0

使用時,你需要確定你

  • 是否有可以並行有效地進行工作的多個線程。
  • 不會增加比您可能實現的改進更多的開銷
  • 操作系統,或某些庫尚未優化以執行您正在嘗試執行的操作。

就你而言,你有一個不使用多線程的好例子。操作系統已經調整爲預讀並在請求之前緩衝數據。讀者所做的工作是相對平庸的。創建新緩衝區,將它們添加到隊列並在線程之間傳遞數據的開銷可能大於並行執行的工作量。

當您嘗試使用多個線程完成單個線程最好完成的任務時,您將得到奇怪的分析/調整結果。

+1對於一個好問題。