背景如何在的TVar
添加一個終結在回答question,我建立和uploaded a bounded-tchan(不會有適合我上傳jnb's version)。如果名稱不夠,則有界tchan(BTChan)是具有最大容量的STM通道(如果通道處於容量,則寫入塊)。
最近,我收到了添加像在regular TChan's中的dup功能的請求。從而開始了這個問題。
的BTChan看起來如何
BTChan的簡化(實際上非功能性)的觀點是下面。
data BTChan a = BTChan
{ max :: Int
, count :: TVar Int
, channel :: TVar [(Int, a)]
, nrDups :: TVar Int
}
你寫信給你包括元組的DUP(nrDups
)的數量的頻道每次 - 這是一個「單獨的元素計」,這表明有多少讀者已經得到了這個元素。
每個閱讀器都會減少它讀取的元素的計數器,然後將它的讀指針移動到列表中的下一個元素。如果閱讀器將計數器遞減到零,則count
的值將遞減以正確反映通道上的可用容量。
要清楚所需的語義:通道容量指示通道中排隊的元素的最大數量。任何給定的元素都會排隊,直到每個dup的讀者都已收到該元素。沒有元素應該排隊等候GCed dup(這是主要問題)。
例如,讓有一個信道的3名的DUP(C1,C2,C3)爲2容量,其中2項被寫入信道,那麼所有項被讀出的c1
和c2
。頻道仍然已滿(0剩餘容量),因爲c3
尚未消耗其副本。在任何時間點,如果所有對c3
的引用都被刪除(因此c3
爲GCed),則應釋放容量(在此情況下恢復爲2)。
這裏的問題:假設我有以下代碼
c <- newBTChan 1
_ <- dupBTChan c -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
導致BTChan的樣子:
在讀計數"hello"
末
BTChan 1 (TVar 0) (TVar []) (TVar 1) --> -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2) --> -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) --> -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2) -- OH NO!
通知仍然1
?這意味着該消息不會被視爲消失(即使它會在實際實施中得到GCed),我們的count
也不會減少。由於通道處於容量狀態(最多1個元件),因此寫入器將始終阻止。
我想每次調用創建的終結器dupBTChan
。當收集一個重複的(或原始的)通道時,剩餘的要在該通道上讀取的所有元素將獲得每個元素的計數遞減,並且nrDups
變量將遞減。因此,將來的寫入將具有正確的count
(不會爲GCed通道未讀取的變量保留空間的count
)。
解決方案1 - 手動資源管理(我想避免的)
JNB的有界tchan實際上有這個原因手動資源管理。請參閱cancelBTChan
。我正在努力爲用戶弄錯一些東西(不是說手動管理在許多情況下不是正確的做法)。
解決方案2 - 通過阻斷對TVars使用異常(GHC不能做到這一點我多麼希望)
編輯這個解決方案,和解決方案3這僅僅是一個分拆,不行!由於bug 5055(WONTFIX),GHC編譯器向兩個被阻塞的線程發送異常,即使其中有一個是足夠的(這在理論上是可確定的,但在GHC GC中不可行)。
如果獲得BTChan
的所有方法都是IO,我們可以在一個線程中讀取/重試給定的BTChan
唯一的額外(虛擬)TVar字段。當所有其他對TVar的引用都被刪除時,新線程將捕獲異常,所以它會知道何時遞減nrDups
和個別元素計數器。這應該工作,但是我的力量所有用戶使用IO得到他們BTChan
S:
data BTChan = BTChan { ... as before ..., dummyTV :: TVar() }
dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
... as before ...
d <- newTVarIO()
let chan = BTChan ... d
forkIO $ watchChan chan
return chan
watchBTChan :: BTChan a -> IO()
watchBTChan b = do
catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
case fromException e of
BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
ls <- readTVar (channel b)
writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
_ -> watchBTChan b
編輯:是的,這是一個貧窮的芒終結,我沒有什麼特別的原因,以避免使用addFinalizer
。這將是同樣的解決方案,仍然強迫使用IO afaict。
解決方案3:比液2一個清潔的API,但GHC仍然沒有通過調用initBTChanCollector
,將監視一組,這些假TVars(從溶液2支持
用戶啓動管理器線程)並進行必要的清理。基本上,它將IO推入另一個線程,該線程知道通過全局(unsafePerformIO
編輯)TVar
做什麼。事情基本上像解決方案2一樣工作,但是BTChan's的創建仍然可以是STM。運行initBTChanCollector
失敗會導致任務隨着進程的運行而不斷增長(空間泄漏)。
解決方案4:絕不允許丟棄BTChan
小號
這類似於忽視的問題。如果用戶從不放棄重複BTChan
則問題消失。
解決方案5 我看到ezyang的答案(完全有效和讚賞),但真的想保持目前的API只是一個「DUP」功能。
**解決方案6 ** 請告訴我有更好的選擇。
編輯: 我implemented solution 3(沒有經過測試的alpha版本),並使得全球本身處理的潛在空間泄露一個BTChan
- 這瓚應該可能有容量爲1所以忘了運行init
顯示了非常快的,但是這一個小小的變化。這在GHCi(7.0.3)中起作用,但似乎是偶然的。 GHC會向兩個被阻塞的線程(有效的線程讀取BTChan和正在監視的線程)拋出異常,所以如果您在另一個線程拋棄它時引用了BTChan而被阻止,那麼您就死掉了。
我不明白你到底在想什麼。重複頻道的語義應該與資源有關?如果它和重複項都已滿,通道會被阻止?如果其中一個滿了? – 2011-03-27 08:18:13
對,這些語義需要澄清。如果你嘗試實現「一個通道塊,如果它和重複都已滿」,那麼你需要問,我是否允許從隊列中刪除元素?如果答案是否定的,那麼你又有了一個無限制的渠道。 – 2011-03-27 09:59:20
(詢問請求dup的人使用它的計劃也是有用的。) – 2011-03-27 09:59:50