2013-07-29 60 views
5

我寫了一個程序來計算語料庫中NGrams的頻率。我已經有消耗記號流,併產生一個單一訂單的n元語法功能:Conduit:Multiple Stream Consumers

ngram :: Monad m => Int -> Conduit t m [t] 
trigrams = ngram 3 
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int) 

目前我只能一個流的消費者連接到流源:

tokens --- trigrams --- countFreq 

怎麼辦我將多個流消費者連接到相同的流源? 我想有這樣的事情:

  .--- unigrams --- countFreq 
      |--- bigrams --- countFreq 
tokens ----|--- trigrams --- countFreq 
      '--- ...  --- countFreq 

的加將並行運行

編輯每個消費者: 多虧了切赫,我想出了這個解決方案

spawnMultiple orders = do 
    chan <- atomically newBroadcastTMChan 

    results <- forM orders $ \_ -> newEmptyMVar 
    threads <- forM (zip results orders) $ 
         forkIO . uncurry (sink chan) 

    forkIO . runResourceT $ sourceFile "test.txt" 
         $$ javascriptTokenizer 
         =$ sinkTMChan chan 

    forM results readMVar 

    where 
     sink chan result n = do 
      chan' <- atomically $ dupTMChan chan 
      freqs <- runResourceT $ sourceTMChan chan' 
           $$ ngram n 
           =$ frequencies 
      putMVar result freqs 
+0

你希望當'tokens'產生一個值時,你所有的'grams'都會收到它? –

回答

5

我假設你想讓你所有的水槽接收所有值。

我建議:

  1. 使用newBroadcastTMChan創建一個新的通道Control.Concurrent.STM.TMChan(STM-CHANS)。
  2. 使用此通道爲您的主生產者使用sinkTBMChanData.Conduit.TMChan(stm-conduit)構建接收器。
  3. 對於每個客戶使用dupTMChan創建自己的副本供閱讀。開始一個新的線程,將使用sourceTBMChan讀取此副本。
  4. 收集您的主題結果。
  5. 確保您的客戶可以在生成數據的同時快速讀取數據,否則可能會導致堆溢出。

(我還沒有嘗試過,讓我們知道它是如何工作的。)


更新:你怎麼能收集結果的一種方法是爲每一個消費者線程創建一個MVar 。他們每個人在完成後都會得到putMVar的結果。所有這些MVar上您的主線程takeMVar,因此等待每個線程完成。例如,如果varsMVar的列表,主線程將發出mapM takeMVar vars來收集所有結果。

+0

感謝您的回答,我如何收集結果,如果我用forkIO產生線程? – SvenK

+0

@SvenK我用一個想法更新了答案,如何收集結果。 –

+0

爲什麼TMChan有一個廣播版本,TBMChan沒有,我在哪裏可以找到'newBroadcastTBMChan'? – CMCDragonkai