2013-11-03 55 views
4

這個問題有點codegolf和很多newb。我在Haskell中使用了很棒的pipes庫,我想分割一個管道以沿多個通道發送相同的數據(做廣播)。 Pipes.Concurrent tutorial建議使用spawn創建郵箱,利用Output的monoid狀態。 例如,我們可以做這樣的事情:Haskell:不使用產卵分裂管道(廣播)

main = do 
(output1, input1) <- spawn Unbounded 
(output2, input2) <- spawn Unbounded 
let effect1 = fromInput input1 >-> pipe1 
let effect2 = fromInput input2 >-> pipe2 
let effect3 = P.stdinLn >-> toOutput (output1 <> output2) 
... 

這是間接通過郵箱真的有必要嗎? 我們可以改爲寫這樣的東西嗎?

main = do 
let effect3 = P.stdinLn >-> (pipe1 <> pipe2) 
... 

以上不編譯,因爲Pipe沒有一個Monoid實例。 這是否有很好的理由? 第一種方法真的是最簡單的方法來分割管道嗎?

回答

8

有兩種方法可以在不使用併發的情況下執行此操作,包括注意事項。

第一種方式是,如果pipe1pipe2只是簡單Consumer s表示死循環一樣:

p1 = for cat f -- i.e. p1 = forever $ await >>= f 
p2 = for cat g -- i.e. p2 = forever $ await >>= g 

...那麼簡單的方法來解決這個問題是這樣寫:

for P.stdinLn $ \str -> do 
    f str 
    g str 

例如,如果p1只是print ing值:

p1 = for cat (lift . print) 

...和p2該值僅僅是寫入句柄:

p2 = for cat (lift . hPutStrLn h) 

...那麼你會像這樣將它們組合起來:

for P.stdinLn $ \str -> do 
    lift $ print str 
    lift $ hPutStrLn h str 

然而,這種簡化僅適用於Consumer平凡的循環。還有另一種更通用的解決方案,即爲管道定義一個ArrowChoice實例。我認爲,基於拉Pipe■不要允許正確的守法情況,但基於推送Pipe就做:

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r } 

instance (Monad m) => Category (Edge m r) where 
    id = Edge push 
    (Edge p2) . (Edge p1) = Edge (p1 >~> p2) 

instance (Monad m) => Arrow (Edge m r) where 
    arr f = Edge (push />/ respond . f) 
    first (Edge p) = Edge $ \(b, d) -> 
     evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b 
     where 
     up() = do 
      (b, d) <- request() 
      lift $ put d 
      return b 
     dn c = do 
      d <- lift get 
      respond (c, d) 

instance (Monad m) => ArrowChoice (Edge m r) where 
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn))) 
     where 
      bef x = case x of 
       Left b -> return b 
       Right d -> do 
        _ <- respond (Right d) 
        x2 <- request() 
        bef x2 
      up() = do 
       x <- request() 
       bef x 
      dn c = respond (Left c) 

這需要一個NEWTYPE,這樣的類型參數的順序來ArrowChoice預期。

如果你不熟悉的術語推式Pipe,它基本上是一個Pipe,從最上游的管道,而不是最下游管開始,他們都具有以下形狀:

a -> Pipe a b m r 

把它想象成一個Pipe,它不能「去」,直到它從上游接收到至少一個值。

這些基於推Pipe s爲「雙」,以傳統的基於拉Pipe S,完全用自己的合成運算和身份:

(>~>) :: (Monad m) 
     => (a -> Pipe a b m r) 
     -> (b -> Pipe b c m r) 
     -> (a -> Pipe a c m r) 

push :: (Monad m) 
     -> a -> Pipe a a m r 

...但單向Pipes API不出口這是默認的。您只能從Pipes.Core獲得這些運營商(你可能想學習該模塊更緊密地建立他們是如何工作的直覺)。該模塊表明,無論基於推送的Pipe S和拉式Pipe s爲更普遍的雙向版本的兩種特殊情況,並瞭解了雙向的情況是,你如何學習,爲什麼他們是彼此的對偶。

一旦有了基於推管的Arrow比如,你可以寫類似:

p >>> bifurcate >>> (p1 +++ p2) 
    where 
    bifurcate = Edge $ pull ~> \a -> do 
     yield (Left a) -- First give `p1` the value 
     yield (Right a) -- Then give `p2` the value 

那麼你可以使用runEdge將其轉換成一個基於拉的管道時,你就完成了。

這種方法有一個主要缺點,那就是你不能自動將基於拉的管道昇級到基於推管的管道(但通常不需要手工操作)。例如,升級Pipes.Prelude.map是一個推式Pipe,你可以這樣寫:

mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r) 
mapPush f a = do 
    yield (f a) 
    Pipes.Prelude.map f 

然後有正確的類型在Arrow被包裹起來:

mapEdge :: (Monad m) => (a -> b) -> Edge m r a b 
mapEdge f = Edge (mapPush f) 

當然,一個更簡單的辦法是剛剛從頭開始寫:

mapEdge f = Edge $ push ~> yield . f 

使用一種最適合你。

其實,我想出了ArrowArrowChoice情況下,正是因爲我試圖回答完全一樣的問題,因爲你:你是怎麼解決這類問題,而無需使用併發?我寫了一個很長的回答大約在另一個堆棧溢出的答案here,在這裏我介紹瞭如何使用這些ArrowArrowChoice實例併發系統提煉成等效純的人更一般的問題。