2014-02-11 43 views
4

我一直在試驗新的管道-http包,我有一個想法。我有兩個網頁解析器,一個返回頁面中的行項目和另一個數字。當我抓取頁面時,將這些解析器串在一起並同時從相同的字符串生成器獲取它們的結果是非常好的,而不是獲取頁面兩次或者將所有html讀入內存並解析它兩次。將兩個消費者加入到返回多個值的單個消費者中?

換句話說,假設您有兩個消費者:

c1 :: Consumer a m r1 
c2 :: Consumer a m r2 

是否有可能作出這樣的功能:

combineConsumers :: Consumer a m r1 -> Consumer a m r2 -> Consumer a m (r1, r2) 
combineConsumers = undefined 

我已經嘗試了一些東西,但我不能想辦法。我明白如果這是不可能的,但它會很方便。

編輯:

對不起原來我是在對管道-attoparsec的假設,因爲我與管道-attoparsec的經驗,使我問錯了問題。當我假設它會返回一個管道消費者時,Pipes-attoparsec將一個attoparsec變成一個管道解析器。這意味着我實際上不能將兩個attoparsec解析器轉換爲接受文本並返回結果的消費者,然後將它們與普通的舊管道生態系統一起使用。我很抱歉,但我不明白管道解析。

即使這對我沒有幫助,但Arthur的答案與我提出問題時的設想非常相似,未來我可能會最終使用他的解決方案。在此期間,我只是要使用導管。

+2

您能鏈接到一個或兩個消費者作爲例子嗎?我熟悉的大部分管道代碼並不使用消費者的基值,並不是用這種方式。另一方面,如果您不使用基礎價值,那麼將兩個消費者拉到一起就是微不足道的。 – Davorak

回答

2

由於達沃拉克在他的評論中提到的原因,我認爲你對此的看法有些不對。但是如果你真的需要這樣一個功能,你可以定義它。

import Pipes.Internal 
import Pipes.Core 

zipConsumers :: Monad m => Consumer a m r -> Consumer a m s -> Consumer a m (r,s) 
zipConsumers p q = go (p,q) where 
    go (p,q) = case (p,q) of 
    (Pure r  , Pure s)  -> Pure (r,s) 
    (M mpr  , ps)   -> M (do pr <- mpr 
             return (go (pr, ps))) 
    (pr   , M mps)  -> M (do ps <- mps 
             return (go (pr, ps))) 
    (Request _ f, Request _ g) -> Request() (\a -> go (f a, g a)) 
    (Request _ f, Pure s)  -> Request() (\a -> do r <- f a 
                 return (r, s)) 
    (Pure r  , Request _ g) -> Request() (\a -> do s <- g a 
                 return (r,s)) 
    (Respond x _, _   ) -> closed x 
    (_   , Respond y _) -> closed y 

如果你是「拉拉鍊」消費者不使用它們的返回值,只有他們的「效果」,你可以只使用tee consumer1 >-> consumer2

0

消費者形成了一個單子,以便

combineConsumers = liftM2 (,) 

將類型檢查。不幸的是,語義可能不像你所期望的那樣:第一個消費者會跑完成,然後跑到第二個。

1

在管道界,值得一提的是,相關功能是zipSinks。可能有某種方法可以使此功能適用於管道,但自動終止可能會阻礙。

+1

那很方便。考慮到attoparsec-conduit的存在,也許我應該考慮在這種情況下使用導管。 –

3

它的結果是「monoidal」,您可以使用Pipes前奏中的tee函數,並結合使用WriterT

{-# LANGUAGE OverloadedStrings #-} 

import Data.Monoid 
import Control.Monad 
import Control.Monad.Writer 
import Control.Monad.Writer.Class 
import Pipes 
import qualified Pipes.Prelude as P 
import qualified Data.Text as T 

textSource :: Producer T.Text IO() 
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah" 

counter :: Monoid w => T.Text 
        -> (T.Text -> w) 
        -> Consumer T.Text (WriterT w IO)() 
counter word inject = P.filter (==word) >-> P.mapM (tell . inject) >-> P.drain 

main :: IO() 
main = do 
    result <-runWriterT $ runEffect $ 
     hoist lift textSource >-> 
     P.tee (counter "foo" inject1) >-> (counter "bar" inject2) 
    putStrLn . show $ result 
    where 
    inject1 _ = (,) (Sum 1) mempty 
    inject2 _ = (,) mempty (Sum 1) 

更新:正如評論所說,真正的問題,我看到的是,在pipes解析器不Consumers。如果他們對剩菜有不同的行爲,你怎麼能同時運行兩個分析器?如果其中一個解析器想要「取消」某些文本,而另一個解析器不需要,會發生什麼?

一種可能的解決方案是在不同的線程中以真正的併發方式運行解析器。 pipes-concurrency包中的基元允許您通過將相同的數據寫入兩個不同的郵箱來「複製」Producer。然後每個解析器都可以用自己的製作者副本來做任何事情。下面是它也使用了pipes-parsepipes-attoparsecasync包的例子:

{-# LANGUAGE OverloadedStrings #-} 

import Data.Monoid 
import qualified Data.Text as T 
import Data.Attoparsec.Text hiding (takeWhile) 
import Data.Attoparsec.Combinator 
import Control.Applicative 
import Control.Monad 
import Control.Monad.State.Strict 
import Pipes 
import qualified Pipes.Prelude as P 
import qualified Pipes.Attoparsec as P 
import qualified Pipes.Concurrent as P 
import qualified Control.Concurrent.Async as A 

parseChars :: Char -> Parser [Char] 
parseChars c = fmap mconcat $ 
    many (notChar c) *> many1 (some (char c) <* many (notChar c)) 

textSource :: Producer T.Text IO() 
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah" 

parseConc :: Producer T.Text IO() 
      -> Parser a 
      -> Parser b 
      -> IO (Either P.ParsingError a,Either P.ParsingError b) 
parseConc producer parser1 parser2 = do 
    (outbox1,inbox1,seal1) <- P.spawn' P.Unbounded 
    (outbox2,inbox2,seal2) <- P.spawn' P.Unbounded 
    feeding <- A.async $ runEffect $ producer >-> P.tee (P.toOutput outbox1) 
               >->  P.toOutput outbox2 
    sealing <- A.async $ A.wait feeding >> P.atomically seal1 >> P.atomically seal2 
    r <- A.runConcurrently $ 
     (,) <$> (A.Concurrently $ parseInbox parser1 inbox1) 
      <*> (A.Concurrently $ parseInbox parser2 inbox2) 
    A.wait sealing 
    return r 
    where 
    parseInbox parser inbox = evalStateT (P.parse parser) (P.fromInput inbox) 

main :: IO() 
main = do 
    (Right a, Right b) <- parseConc textSource (parseChars 'o') (parseChars 'a') 
    putStrLn . show $ (a,b) 

結果是:

("oooo","aa") 

我不知道這種做法有多大的開銷介紹。

+0

不幸的是我的結果不是monoidal。我想可以使用狀態。我想我也可以使用iorefs。 –

+0

@mindreader您始終可以使用'Data'中的'First'和'Last'。「Monoid」可以使任何類型的「變成monoid」。我看到的真正問題是,在「管道」中,「Parsers」不是「消費者」。另外,在並行運行兩個解析器時剩餘的處理可能會非常棘手。 – danidiaz

2

慣用的解決方案是重寫Consumer S作爲從一個FoldFoldMfoldl庫,然後使用Applicative風格進行組合。然後,您可以將此合併摺疊轉換爲適用於管道的合併摺疊。

讓我們假設你要麼有兩個Fold S:

fold1 :: Fold a r1 
fold2 :: Fold a r2 

...或兩個FoldM S:

foldM1 :: Monad m => FoldM a m r1 
foldM2 :: Monad m => FoldM a m r2 

然後你這些組合成使用Applicative風格單一Fold/FoldM

import Control.Applicative 

foldBoth :: Fold a (r1, r2) 
foldBoth = (,) <$> fold1 <*> fold2 

foldBothM :: Monad m => FoldM a m (r1, r2) 
foldBothM = (,) <$> foldM1 <*> foldM2 

-- or: foldBoth = liftA2 (,) fold1 fold2 
--  foldMBoth = liftA2 (,) foldM1 foldM2 

您可以將摺疊變成Pipes.Prelude風格摺疊或Parser。以下是必要的轉換功能:

import Control.Foldl (purely, impurely) 
import qualified Pipes.Prelude as Pipes 
import qualified Pipes.Parse as Parse 

purely Pipes.fold 
    :: Monad m => Fold a b -> Producer a m() -> m b 

impurely Pipes.foldM 
    :: Monad m => FoldM m a b -> Producer a m() -> m b 

purely Parse.foldAll 
    :: Monad m => Fold a b -> Parser a m r 

impurely Parse.foldMAll 
    :: Monad m => FoldM a m b -> Parser a m r 

的原因purelyimpurely功能是使得foldlpipes可以在沒有任何一個引起對其他的依賴性進行互操作。此外,它們還允許pipes(如conduit)以外的庫在不依賴的情況下重用foldl(提示提示,@MichaelSnoyman)。

我很抱歉,此功能沒有記錄,這主要是因爲我花了一段時間來弄清楚如何獲得pipesfoldl在無依賴的方式實現互操作,這是在我寫的pipes教程。我會更新教程來指出這個竅門。

要學習如何使用foldl,請在主模塊中閱讀the documentation。這是一個非常小且易於學習的圖書館。

相關問題