2011-03-06 96 views
4

我正在用Haskell線程進行測試,我遇到了在通道中傳遞延遲評估值的問題。例如,使用N個工作線程和1個輸出線程,工作人員交流未評估的工作,輸出線程最終爲他們完成工作。Haskell中併發通道的嚴格評估技巧

我已經閱讀過有關這個問題的各種文檔,並看到了各種解決方案,但我只找到了一種可行的解決方案,其餘的都沒有。下面是一些代碼,其中工作線程開始一些可能需要很長時間的計算。我以降序啓動線程,以便第一個線程應該花費最長時間,而後面的線程應該提前完成。

import Control.Concurrent (forkIO) 
import Control.Concurrent.Chan -- .Strict 
import Control.Concurrent.MVar 
import Control.Exception (finally, evaluate) 
import Control.Monad (forM_) 
import Control.Parallel.Strategies (using, rdeepseq) 

main = (>>=) newChan $ (>>=) (newMVar []) . run 

run :: Chan (Maybe String) -> MVar [MVar()] -> IO() 
run logCh statVars = do 
    logV <- spawn1 readWriteLoop 
    say "START" 
    forM_ [18,17..10] $ spawn . busyWork 
    await 
    writeChan logCh Nothing -- poison the logger 
    takeMVar logV 
    putStrLn "DONE" 
    where 
    say mesg = force mesg >>= writeChan logCh . Just 

    force s = mapM evaluate s -- works 
-- force s = return $ s `using` rdeepseq -- no difference 
-- force s = return s -- no-op; try this with strict channel 

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen 
    embiggen i = i*i*i*i*i 

    readWriteLoop = readChan logCh >>= writeReadLoop 
    writeReadLoop Nothing = return() 
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop 

    spawn1 action = do 
     v <- newEmptyMVar 
     forkIO $ action `finally` putMVar v() 
     return v 

    spawn action = do 
     v <- spawn1 action 
     modifyMVar statVars $ \vs -> return (v:vs,()) 

    await = do 
     vs <- modifyMVar statVars $ \vs -> return ([], vs) 
     mapM_ takeMVar vs 

使用最常用的技巧,結果被報道爲了催生;也就是計算時間最長的計算。我解釋這意味着輸出線程做所有的工作:

-- results in order spawned (longest-running first = broken) 
START 
892616806655 
503999185040 
274877906943 
144162977343 
72313663743 
34464808608 
15479341055 
6484436675 
2499999999 
DONE 

我認爲這個問題的答案將是嚴格的渠道,但他們沒有工作。我知道WHNF對於字符串是不夠的,因爲這會強制最外層的構造函數(字符串的第一個字符爲零或缺省)。 rdeepseq應該完全評估,但它沒有區別。我發現的唯一工作是將Control.Exception.evaluate :: a -> IO a映射到字符串中的所有字符。 (請參閱在代碼中force功能評論幾種不同的選擇。)下面是與Control.Exception.evaluate結果:

-- results in order finished (shortest-running first = correct) 
START 
2499999999 
6484436675 
15479341055 
34464808608 
72313663743 
144162977343 
274877906943 
503999185040 
892616806655 
DONE 

那麼,爲什麼不嚴格的通道或rdeepseq產生這種結果?還有其他技術嗎?我誤解了爲什麼第一個結果被打破?

回答

5

這裏有兩個問題。

究其原因,第一次嘗試(使用一個明確的rnf)不起作用的是,通過使用return,您創建一個thunk,充分評估自身時,其被評估,但咚本身具有不評估。注意的評價類型爲a -> IO a:它返回IO值的事實意味着evaluate可以強制排序:

return (error "foo") >> return 1 == return 1 
evaluate (error "foo") >> return 1 == error "foo" 

其結果是,這種代碼:

force s = evaluate $ s `using` rdeepseq 

將工作(如與mapM_ evaluate s具有相同的行爲)。


採用嚴格的通道的情況下是有點麻煩,但我相信這是由於嚴格的併發的錯誤。昂貴的計算實際上是在工作線程上運行的,但它並不是很好(您可以通過在字符串中隱藏一些異步異常並查看異常表面上的哪個線程來明確檢查它)。

這是什麼問題?讓我們來看看代碼進行嚴格writeChan

writeChan :: NFData a => Chan a -> a -> IO() 
writeChan (Chan _read write) val = do 
    new_hole <- newEmptyMVar 
    modifyMVar_ write $ \old_hole -> do 
    putMVar old_hole $! ChItem val new_hole 
    return new_hole 

我們看到modifyMVar_叫上write之前,我們評估的thunk。操作的順序則是:進入

  1. writeChan
  2. 我們takeMVar write(阻止別人誰想要寫的信道)
  3. 我們評估了昂貴的thunk
  4. 我們把昂貴的thunk到通道
  5. 我們putMVar write,疏通所有其他線程的

由於在獲取鎖定之前執行評估,因此您不會看到evaluate變體的此行爲。

我會向Don發送郵件,看看他是否同意這種行爲是不理想的。

唐同意這種行爲是不理想的。我們正在開發一個補丁。

+0

第二部分非常有趣(仍在思考第一部)。所以嚴格的頻道在發送線程上評估,但是首先在頻道中做一種*預留*工作人員可以按我想要的順序完成工作,但是頻道序列化他們的FCFS(而不是FFFS =首先完成,首次服務)。謝謝。 – chrisleague 2011-03-06 18:01:49

+0

保留是一個好詞。 – 2011-03-06 19:40:28

+0

作爲一個方面說明,您並不總是需要deepseq,這會強制評估列表中的每個元素。通常,它足以強制評估列表的脊柱,這可以通過'length xs'seq'xs'來完成 – sclv 2011-03-07 16:22:52