2011-03-11 45 views
3

我們在TChan上有一些傾銷價值,然後消費者處理這些價值。但消費者跟不上,所以我們獲得了大量的內存使用量,因爲製作人在頻道上傾銷了大量內容,但消費者並沒有跟上。如果通道隊列變成一定大小或者某種東西,是否有直接的方法讓製作者阻止,那麼我們可以讓製片人等待消費者趕上?如何在Haskell中的TChan生產者/消費者情況中扼制生產者?

+2

爲 「有限tchan」 第一個搜索結果:[github上/ JNB /有界tchan(https://github.com/jnb/bounded-tchan) – ephemient 2011-03-11 20:20:40

回答

3

像約翰的答案一樣,我建議你自己建一個有界的TChan。我的代碼是不同的,因爲它:

  1. 增加了抽象(使BTChan的ADT)
  2. 角落的情況下消除由於他讀電流的大小在IO。
  3. 嘗試在閱讀時不會構建大小爲TVar的thunk(寫作時不重要,因爲thunk只能是「一個深度」 - 下一個操作總是需要評估大小)。
  4. 現在是hackage:http://hackage.haskell.org/package/bounded-tchan

注:說實話,如果我是你,我會忽略所有這些問題的答案,只是在他的評論鏈接到代碼ephemient去(除非它原來是壞碼)。我敢打賭,它做的和我在這裏做的一樣,但有更多的想法。

{-# LANGUAGE BangPatterns #-} 
module BTChan 
     (BTChan 
     , newBTChanIO 
     , newBTChan 
     , writeBTChan 
     , readBTChan 
     ) where 

import Control.Concurrent.STM 

data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int) 

-- | `newBTChan m` make a new bounded TChan of max size `m` 
newBTChanIO :: Int -> IO (BTChan a) 
newBTChanIO m = do 
    szTV <- newTVarIO 0 
    c <- newTChanIO 
    return (BTChan m c szTV) 

newBTChan :: Int -> STM (BTChan a) 
newBTChan m 
     | m < 1 = error "BTChan's can not have a maximum <= 0!" 
     | otherwise = do 
     szTV <- newTVar 0 
     c <- newTChan 
     return (BTChan m c szTV) 

writeBTChan :: BTChan a -> a -> STM() 
writeBTChan (BTChan mx c szTV) x = do 
     sz <- readTVar szTV 
     if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x 

readBTChan :: BTChan a -> STM a 
readBTChan (BTChan _ c szTV) = do 
     x <- readTChan c 
     sz <- readTVar szTV 
     let !sz' = sz - 1 
     writeTVar szTV sz' 
     return x 

sizeOfBTChan :: BTChan a -> STM Int 
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV 

爲STM程序員的一些注意事項:

  • 顯式調用retry將產生,把你的Haskell線程處於阻塞狀態等待TVar的一個或TChan的狀態改變,因此它可以重試。這是您如何避免檢查IO中的值並使用yield函數。
  • 像MVars一樣,TVars可以指thunk,它通常不是你想要的。也許有人應該制定一個定義STVarSTChan,SBTChanBTChan(嚴格和/或有界TVars和TChans)的hackage包。
  • 實際上有必要編寫newBTChanIO而不是槓桿作用newBTChan,因爲new{TVar,TChan}IO的實現即使在unsafePerformIO下也能正常工作,atomically不能這樣做。

編輯:你可以通過將TVar分成一個讀者和一個作者來獲得更好的性能(取決於你使用的邊界)2-5倍,從而減少爭用。使用標準驗證。改進後的版本0.2.1已經在使用。

+0

你有沒有明確調用'retry'的引用總會產生? – 2011-03-13 12:26:21

+0

@John它並不總是yeild,但它不會重試,直到它所讀取的TVar中的一個被重寫,這意味着當值不變時,它將不會重試,這將導致再次調用相同的'retry'。 SimonPJ鏈接了一些非常可讀的[STM論文](http://research.microsoft.com/en-us/um/people/simonpj/papers/stm/index.htm)。 – 2011-03-13 16:16:26

0

關於hackage有BoundedChan,但使用MVars,而不是STM。你可以用它來學習如何編寫你自己的代碼 - 它只是關於一頁代碼。

2

也許最簡單的解決辦法是增加一個TVar指示信道單元的數量:

type BoundedChan a = (TChan a, TVar Int, Int) 

writeBoundedChan :: BoundedChan a -> a -> IO() 
writeBoundedChan [email protected](tchan, tsz, maxsz) x = do 
    cursz' <- readTVarIO tsz 
    if cursz' >= maxsz 
    then yield >> writeBoundedChan bc x 
    else atomically $ do 
     writeTChan tchan a 
     cursz <- readTVar tsz 
     writeTVar tsz (cursz+1) 

readBoundedChan :: BoundedChan a -> IO a 
readBoundedChan (tchan, tsz, maxsz) = atomically $ do 
    x <- readTChan tchan 
    cursz <- readTVar tsz 
    writeTVar tsz (cursz-1) 
    return x 

注意,最大尺寸可如果你有多個生產略有超過,因爲cursz值可以改變在兩個閱讀之間。

+0

你並不需要檢查'cursz在原子之前在IO中。只需在原文中完成一次閱讀:'readTVar >> = \ cursz - >如果cursz> = maxsz則重試其他...'注意在那裏使用'retry'。 – 2011-03-12 17:20:21

+0

@TomMD - 如果您在同一個「STM」塊中重試,線程不一定會屈服。 hackage文檔說它「可能會阻塞線程」,所以我不會認爲它總是會阻塞。我也不會相信這種行爲在未來的實施中不會改變。我可能只是偏執狂,因爲我不明白爲什麼非阻塞重試會有用,但這是我的看法。 – 2011-03-13 12:25:26

+0

產出是「重試」的工作方式,它不會在相同的,不變的TVars上運行相同的STM操作。它「可能」會產生收益,因爲如果TVAR被另一個STM操作與「重試」操作同時更改,它就不會產生。如果你仍然對此感到不適,那麼你可以讓'writeBoundedChan'調用一個'STM'操作,當'False'返回'Bool'和'yeild'時,返回'True' - 這應該可以工作(浪費更多的處理由於剩餘一個可以調度的線程,因此它的功耗比'retry'更高),並且移除超大尺寸的錯誤。 – 2011-03-13 16:29:58

1

我知道這個遊戲有點遲,但是你也可以實現跳過頻道,它允許對頻道進行非阻塞寫入,但是「覆蓋」了任何未被看到的舊值讀者。

import Control.Concurrent.MVar 

data SkipChan a = SkipChan (MVar (a, [MVar()])) (MVar()) 

newSkipChan :: IO (SkipChan a) 
newSkipChan = do 
    sem <- newEmptyMVar 
    main <- newMVar (undefined, [sem]) 
    return (SkipChan main sem) 

putSkipChan :: SkipChan a -> a -> IO() 
putSkipChan (SkipChan main _) v = do 
    (_, sems) <- takeMVar main 
    putMVar main (v, []) 
    mapM_ (\sem -> putMVar sem()) sems 

getSkipChan :: SkipChan a -> IO a 
getSkipChan (SkipChan main sem) = do 
    takeMVar sem 
    (v, sems) <- takeMVar main 
    putMVar main (v, sem:sems) 
    return v 

dupSkipChan :: SkipChan a -> IO (SkipChan a) 
dupSkipChan (SkipChan main _) = do 
    sem <- newEmptyMVar 
    (v, sems) <- takeMVar main 
    putMVar main (v, sem:sems) 
    return (SkipChan main sem)