2012-07-04 21 views
7

我需要生成ñ消費者線程,能同時處理相同的InputStream,例如: - 以某種方式進行轉換,計算校驗和數字簽名等,這些消費者不依賴於對方,他們都正在使用第三方庫,它接受InputStream作爲數據源。並行處理與獨立的消費者

所以,我能做的就是 - 創造InputStream的一些實施,這將

  • 從「父母」流
  • 疏通消費者數據的讀取塊
  • 等到每個消費者讀取整個塊
  • 下一個讀取塊

,同時看着簡單,它可能上升象livelo各種問題當某些消費者死亡時,實施所有InputStream方法,使用障礙/鎖存器等控制消費者自己的叉/加入。

一位好友告訴我,實施了半小時,這使我的晚上。

我寧願要麼使用足夠成熟的東西(谷歌搜索沒有結果,因此,我的谷歌福不夠好?)或不打擾和複製整個「源」流到臨時文件並將其用作數據源。後一種解決方案似乎更可靠,但最終可能會創建GB級文件(例如處理流式音頻時)。

+0

你可以將數據寫入文件併產生N個FileInputStreams嗎? –

+0

@JonLin正如他在問題結束時所說的那樣,他可以。 –

回答

3

我看到它的方式,你至少應該有某種緩衝,這樣不同的消費者可以通過流在不同的步調沒有對所有被不斷地陷入了目前最慢的消費者。這基本上確保了最差情況下的性能,並且併發性的好處很小。

你可以,比方說,標記每一塊與迄今爲止已使用它,然後刪除那些完全耗光了消費者。也許這可以通過每個消費者持有對其尚未使用的每個塊的引用來實現,這將允許GC自動處理使用的塊。生產者可能會在塊中保留一個WeakReference的列表,這樣它就可以處理尚未使用的塊的數量,並根據該塊進行限制。

我也在考慮每個線程有一個單獨的InputStream實例,它與生產者InputStream進行內部通信。通過這種方式,您可以輕鬆解決您的活鎖危險:try ... finally { is.close(); } - 即將消亡的消費者關閉自己的輸入流。這是傳達給生產者。

我有一些想法,使用每消費一ArrayBlockingQueue。要確保所有的消費者都得到適當的餵養,而不是讓生產者阻止或者忙碌等待會有一些困難。

+0

我不會說它是非常有益的 - 有5個消費者爲1個secons和1個消費者工作2秒,併發調用將會給2秒,而順序會給7秒。或者我在這裏錯過了什麼?有了標記的塊和緩衝區,我會打內存消耗,我想避免。 – jdevelop

+0

是的,你說什麼是不可避免的。但是,如果您的消費者的平均消費者平衡,但他們的表現差異很大,那麼如果您始終等待當前滯後的每位消費者,就會失去併購的機會。緩衝將在那裏幫助。如果你引入線程優先級平衡,你實際上可以實現這種情況。 –

0

你有沒有考慮使用管道流?您的製作人可以擁有一個或多個PipedOuputStream,其中它會從文件中讀取任何內容。在管道的另一端,您可以在對應的PipedInputstream(這是一個可與庫共享的InputStream)上讀取不同的消費者線程。

您的生產者線程可以決定通過哪個管道發送數據,通過這種方式爲管道另一端的給定消費者線程讀取提供要處理的數據。

如果您需要從消費者線程中獲取數據,則可以創建另一個管道,以相反的方向將數據發回給您。

+1

一旦任何消費者落後,PipedOutputStream將會阻止生產者,使所有其他消費者捱餓。 –

0

您可以嘗試一些Java消息傳遞服務(JMS)實現,如Apache ActiveMQ

在你的情況下,你需要創建一個所謂的主題(見Topics vs. Queues)。主題由生產者創建,併發布給N個消費者,這些消費者可以同時運行,每個消費者接收完全相同的數據。

既然你想使用InputStream s有一章關於如何send messages are streams

我想,通常情況下,生產者和消費者將是獨立的進程,可能運行在網絡上的不同機器上。不過,我認爲您可以將其配置爲在單個JVM中完全運行。這將取決於JMS的實施。這些也很有名:HornetQ by JBossRabbitMQ,和其他一大堆。

相關問題