我們在TChan上有一些傾銷價值,然後消費者處理這些價值。但消費者跟不上,所以我們獲得了大量的內存使用量,因爲製作人在頻道上傾銷了大量內容,但消費者並沒有跟上。如果通道隊列變成一定大小或者某種東西,是否有直接的方法讓製作者阻止,那麼我們可以讓製片人等待消費者趕上?如何在Haskell中的TChan生產者/消費者情況中扼制生產者?
回答
像約翰的答案一樣,我建議你自己建一個有界的TChan。我的代碼是不同的,因爲它:
- 增加了抽象(使
BTChan
的ADT) - 角落的情況下消除由於他讀電流的大小在IO。
- 嘗試在閱讀時不會構建大小爲TVar的thunk(寫作時不重要,因爲thunk只能是「一個深度」 - 下一個操作總是需要評估大小)。
- 現在是hackage:http://hackage.haskell.org/package/bounded-tchan
注:說實話,如果我是你,我會忽略所有這些問題的答案,只是在他的評論鏈接到代碼ephemient去(除非它原來是壞碼)。我敢打賭,它做的和我在這裏做的一樣,但有更多的想法。
{-# LANGUAGE BangPatterns #-}
module BTChan
(BTChan
, newBTChanIO
, newBTChan
, writeBTChan
, readBTChan
) where
import Control.Concurrent.STM
data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int)
-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
szTV <- newTVarIO 0
c <- newTChanIO
return (BTChan m c szTV)
newBTChan :: Int -> STM (BTChan a)
newBTChan m
| m < 1 = error "BTChan's can not have a maximum <= 0!"
| otherwise = do
szTV <- newTVar 0
c <- newTChan
return (BTChan m c szTV)
writeBTChan :: BTChan a -> a -> STM()
writeBTChan (BTChan mx c szTV) x = do
sz <- readTVar szTV
if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x
readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
x <- readTChan c
sz <- readTVar szTV
let !sz' = sz - 1
writeTVar szTV sz'
return x
sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV
爲STM程序員的一些注意事項:
- 顯式調用
retry
將產生,把你的Haskell線程處於阻塞狀態等待TVar
的一個或TChan
的狀態改變,因此它可以重試。這是您如何避免檢查IO
中的值並使用yield
函數。 - 像MVars一樣,TVars可以指thunk,它通常不是你想要的。也許有人應該制定一個定義
STVar
,STChan
,SBTChan
和BTChan
(嚴格和/或有界TVars和TChans)的hackage包。 - 實際上有必要編寫
newBTChanIO
而不是槓桿作用newBTChan
,因爲new{TVar,TChan}IO
的實現即使在unsafePerformIO
下也能正常工作,atomically
不能這樣做。
編輯:你可以通過將TVar分成一個讀者和一個作者來獲得更好的性能(取決於你使用的邊界)2-5倍,從而減少爭用。使用標準驗證。改進後的版本0.2.1已經在使用。
你有沒有明確調用'retry'的引用總會產生? – 2011-03-13 12:26:21
@John它並不總是yeild,但它不會重試,直到它所讀取的TVar中的一個被重寫,這意味着當值不變時,它將不會重試,這將導致再次調用相同的'retry'。 SimonPJ鏈接了一些非常可讀的[STM論文](http://research.microsoft.com/en-us/um/people/simonpj/papers/stm/index.htm)。 – 2011-03-13 16:16:26
關於hackage有BoundedChan,但使用MVars,而不是STM。你可以用它來學習如何編寫你自己的代碼 - 它只是關於一頁代碼。
也許最簡單的解決辦法是增加一個TVar
指示信道單元的數量:
type BoundedChan a = (TChan a, TVar Int, Int)
writeBoundedChan :: BoundedChan a -> a -> IO()
writeBoundedChan [email protected](tchan, tsz, maxsz) x = do
cursz' <- readTVarIO tsz
if cursz' >= maxsz
then yield >> writeBoundedChan bc x
else atomically $ do
writeTChan tchan a
cursz <- readTVar tsz
writeTVar tsz (cursz+1)
readBoundedChan :: BoundedChan a -> IO a
readBoundedChan (tchan, tsz, maxsz) = atomically $ do
x <- readTChan tchan
cursz <- readTVar tsz
writeTVar tsz (cursz-1)
return x
注意,最大尺寸可如果你有多個生產略有超過,因爲cursz值可以改變在兩個閱讀之間。
你並不需要檢查'cursz在原子之前在IO中。只需在原文中完成一次閱讀:'readTVar >> = \ cursz - >如果cursz> = maxsz則重試其他...'注意在那裏使用'retry'。 – 2011-03-12 17:20:21
@TomMD - 如果您在同一個「STM」塊中重試,線程不一定會屈服。 hackage文檔說它「可能會阻塞線程」,所以我不會認爲它總是會阻塞。我也不會相信這種行爲在未來的實施中不會改變。我可能只是偏執狂,因爲我不明白爲什麼非阻塞重試會有用,但這是我的看法。 – 2011-03-13 12:25:26
產出是「重試」的工作方式,它不會在相同的,不變的TVars上運行相同的STM操作。它「可能」會產生收益,因爲如果TVAR被另一個STM操作與「重試」操作同時更改,它就不會產生。如果你仍然對此感到不適,那麼你可以讓'writeBoundedChan'調用一個'STM'操作,當'False'返回'Bool'和'yeild'時,返回'True' - 這應該可以工作(浪費更多的處理由於剩餘一個可以調度的線程,因此它的功耗比'retry'更高),並且移除超大尺寸的錯誤。 – 2011-03-13 16:29:58
我知道這個遊戲有點遲,但是你也可以實現跳過頻道,它允許對頻道進行非阻塞寫入,但是「覆蓋」了任何未被看到的舊值讀者。
import Control.Concurrent.MVar
data SkipChan a = SkipChan (MVar (a, [MVar()])) (MVar())
newSkipChan :: IO (SkipChan a)
newSkipChan = do
sem <- newEmptyMVar
main <- newMVar (undefined, [sem])
return (SkipChan main sem)
putSkipChan :: SkipChan a -> a -> IO()
putSkipChan (SkipChan main _) v = do
(_, sems) <- takeMVar main
putMVar main (v, [])
mapM_ (\sem -> putMVar sem()) sems
getSkipChan :: SkipChan a -> IO a
getSkipChan (SkipChan main sem) = do
takeMVar sem
(v, sems) <- takeMVar main
putMVar main (v, sem:sems)
return v
dupSkipChan :: SkipChan a -> IO (SkipChan a)
dupSkipChan (SkipChan main _) = do
sem <- newEmptyMVar
(v, sems) <- takeMVar main
putMVar main (v, sem:sems)
return (SkipChan main sem)
- 1. Java生產者 - 消費者:生產者不「通知()」消費者
- 2. 生產者/消費者
- 3. 生產者消費者
- 4. POSIX生產者 - 消費者
- 5. Clojure生產者消費者
- 6. LinkedBlockingQueue - 生產者/消費者
- 7. Scala生產者 - 消費者
- 8. 生產者,消費者POSIX
- 9. 生產者消費者在Java中
- 10. 生產消費者
- 11. Grails中的生產者/消費者?
- 12. C++中的消費者/生產者
- 13. wcf中的消費者生產者
- 14. 如何在使用Semphores的生產者 - 消費者中消費?
- 15. 如何線程(生產者/消費者)
- 16. 生產者 - 消費者多個生產者多個隊列單個消費者
- 17. 生產者/消費者線程中的油門消費者
- 18. 生產者/消費者模式與批生產者
- 19. 同步生產者,消費者和生產者隊列
- 20. 生產者/消費者 - 生產者使用高CPU
- 21. 生產者消費者請求取消
- 22. 生產者 - 消費者在python
- 23. 生產者消費者在PHP和Java
- 24. 一個生產者,兩位消費者作用於由生產者生產
- 25. 生產者和消費者的sem_wait()
- 26. 雙排隊的消費者生產者
- 27. RxJava的生產者 - 消費者
- 28. Java的消費者/生產者
- 29. 消費者生產者多線程消費者不會消逝
- 30. 在生產者/消費者模式中,我如何殺死消費者線程?
爲 「有限tchan」 第一個搜索結果:[github上/ JNB /有界tchan(https://github.com/jnb/bounded-tchan) – ephemient 2011-03-11 20:20:40