1

我正在尋找一種方法來並行地在2維陣列上映射函數f :: a -> IO(b),同時保留合理的內存消耗。
我也希望能夠提供數組索引作爲函數的參數,即映射g :: Int -> Int -> a -> IO(b),如來自Data.Vector的imap或來自Data.Map的mapWithKey
當前的嘗試(見下文)要麼有可怕的內存消耗,要麼在運行時拋出錯誤。陣列上的非確定性函數的異步映射

請注意,實際上,我感興趣的函數的類型是h :: Int -> Int -> a -> Random b,其中Random表示來自Control.Monad.Random的一些Random monad;我使用evalRandIO將其移至IO monad。


嘗試的解決方案:

說我希望功能foo :: Int -> Int -> a -> IO(b)超過a類型的元素的2D陣列映射。 (這裏ab是特定類型的,沒有隱式全稱量化。)
到目前爲止,我曾嘗試以下方法:

與Control.Concurrent.Async

import Control.Concurrent.Async(mapConcurrently) 

indexedArray :: [[(Int,Int,a)]] 
indexedArray = -- ... 
mappedArray = mapConcurrently (traverse (\(x,y,a) -> foo x y a)) indexedArray 
  1. 平原名單

這種方法的問題是內存消耗不在圖表中(例如4GB作爲參考)。
正如答案中所指出的,採用這種方法,我只是並行地評估行而不是所有元素,但在實踐中這對我沒有太大的影響。

  • 惹巴

    import qualified Data.Array.Repa as R 
    import Data.Array.Repa(Z(..), (:.)(..), U, DIM2) 
    
    array :: R.Array U DIM2 a 
    array = -- ... 
    mappedArray = R.traverse array id (\i (Z :. x :. y) -> unsafePerformIO $ foo x y (i (Z :. x :. y))) 
    result = R.computeP mappedArray 
    
  • 注意R.traverseData.Traversable(traverse)。由於Repa陣列不支持Data.Traversable(traverse),因此我無法以任何方式對IO操作進行排序,因此我必須使用unsafePerformIO才能使用內置的"traverse"功能。
    這種方法具有良好的性能和出色的內存消耗(大約50MB供參考)。
    但是,有一個問題,因爲我始終得到以下運行時錯誤:thread blocked indefinitely in an MVar operation

    3a。 Data.Vector和Control.Parallel

    與Repa基本相同的方法導致相同的錯誤,thread blocked indefinitely in an MVar operation
    我再次使用unsafePerformIO作爲Data.Vector矢量沒有可遍歷的實例。

    import qualified Data.Vector as V 
        import Control.Parallel.Strategies(using) 
        import Data.Vector.Strategies(parVector) 
    
        array :: V.Vector (V.Vector a) 
        array = -- ... 
        mappedArray = V.imap (\ y row -> V.imap (\x a -> unsafePerformIO $ foo x y a) row) `using` (parVector 1) 
    

    與Repa相比,內存消耗和性能略差(約100MB供參考),但仍保持可比性。

    3b。 Data.Vector和Control.Concurrent.Async

    正如shellll所建議的,但使用平面向量而不是嵌套向量。

    import qualified Data.Vector.Unboxed as V 
        import qualified Data.Vector.Unboxed.Mutable as M 
        import Control.Concurrent.Async(forConcurrently_) 
    
        mappedFlattenedArray = do 
         flattenedMArray <- V.unsafeThaw $ -- ... 
         forConcurrently_ [0..w*h] (\i -> do v <- M.unsafeRead flattenedMArray i 
                  let (y,x) = quotRem i w 
                  v' <- foo x y v 
                  M.unsafeWrite flattenedMArray i v') 
         V.unsafeFreeze flattenedMArray 
    

    不幸的是,這種方法(〜3GB)的內存消耗非常高。我認爲這是因爲forConcurrently_創建了很多thunk?我不知道如何避免這個問題。

  • Data.Array和Control.Concurrent.Async
  • 使用Data.Array陣列的橫越例如,如所建議的艾力:

    import qualified Data.Array.Unboxed as A 
        import Control.Concurrent.Async(mapConcurrently) 
    
        indexedArray :: A.Array (Int,Int) ((Int,Int),a) 
        indexedArray = -- ... 
        mappedArray = mapConcurrently (\((x,y),a) -> foo x y a) indexedArray 
    

    再一次,內存消耗非常高(〜3GB),即使使用unboxed陣列也是如此;該問題可能與方法1和方法3b相同,其中thunk的累積消耗大量內存。我不知道如何解決它。


    的整體性能和內存消耗似乎是惹巴優於任何其它方法,我也很欣賞內置的功能與2維數組處理,並能夠映射使用索引功能。不幸的是,大部分時間我都會遇到上述的運行時錯誤(但並非總是如此!)。

    我早些時候說過,foo的返回類型爲IO(b)的唯一原因是因爲非確定性。所以我會想我可以改變輸出類型爲一些Random monad,而不是做unsafePerformIO我可以簡單地使用給定的種子執行runRandom。不幸的是,這並沒有解決問題,因爲我始終得到錯誤thread blocked indefinitely in an MVar operation

    有沒有什麼辦法可以打撈方法2(Repa)來規避這個錯誤?或者還有其他適用的方法嗎? 我知道一般情況下,IO必然會破壞並行性,因爲不能保證IO操作不會發生衝突,但至少對於這種用例我相信應該有可能的解決方案。 (請參閱:Why is there no mapM for repa arrays?

    另請參閱以下問題:Parallel mapM on Repa arrays。但是請注意,我不知道我的函數foo需要多少個隨機數。

    +0

    你讀過'unsafePerformIO'的文檔嗎?你能告訴我們'富'嗎? – jberryman

    +0

    這個問題似乎是基於對IO類型在系統中作用的誤解。如果你有一個函數'A - > IO B',你知道'沒有副作用,那麼你應該可以重寫它以使'IO(A - > B)'類型。如果你不能這樣做,那麼你的功能確實有副作用。如果隨機性是你唯一的副作用,那麼[this](https://hackage.haskell.org/package/MonadRandom-0.5.1/docs/Control-Monad-Random-Class.html#t:MonadInterleave)應該是有幫助的(特別是,文檔說:「這可以用於,例如,允許隨機計算並行運行」) – user2407038

    回答

    3

    你的第一種方法可能是你想要的,但不是一個鏈表。注意類型mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)允許你這樣做相當於一個平行traverse超過任何Traversable,包括Array(我建議Array超過Vector這裏只是因爲它更適合於多個維度)。

    import Control.Concurrent.Async (mapConcurrently) 
    import Data.Array 
    
    indexedArray :: Array (Int,Int) (Int,Int,a) 
    indexedArray = ... 
    
    mappedArray = mapConcurrently (\(x,y,a) -> foo x y a) indexedArray 
    

    另外請注意,你以前的嵌套列表方法僅並行每個子表的traverse - 它沒有paralellize整個事情。

    +0

    謝謝,我沒有意識到Data.Array數組是可以遍歷的,因爲Data.Vector向量和Repa數組很遺憾不是。 – Will

    +0

    @ will Data.vector也可以穿越 - 只是不是無箱版 – Alec

    +0

    噢好吧,我沒有意識到。 你有任何建議來緩解(我認爲是)thunk積累的問題?正如我在(更新的)問題中所解釋的,所有使用異步的版本似乎至少使用了20倍的內存量。 – Will

    1

    爲了獲得最大的性能和緊張的內存佈局,沒有任何不必要的數組拷貝,我建議使用Data.Vector.Storable.Mutable

    一個罐thaw/unsafeThaw任何不可變的載體(例如Data.Vector.Storable)取回一可變矢量,它支持在Data.Vector.Storable.Mutable定義的操作,如 readwrite,並且其一元行動PrimMonad約束,PrimMonad是基本monad,如IOST

    例如,write的簽名是:

    (PrimMonad m, Storable a) => MVector (PrimState m) a -> Int -> a -> m() 
    

    只要看看在documentation for convertion to/from a mutable vector

    這似乎令人生畏,但它實際上是非常簡單的:MVector (PrimState m) a就是你從thawm可能是STIOPrimState ms如果mST sReadWorld如果mIO,該Int參數僅元素索引和a是新值。 該函數返回一個動作,其中在位置/破壞性更新給定位置處的矢量的副作用。

    完成時突變的載體,可以freeze/unsafeFreeze它,以獲得一個不變的載體背面, freezeunsafeFreezethawunsafeThaw, 例如相對unsafeFreeze有類型簽名:

    unsafeFreeze :: (Storable a, PrimMonad m) => MVector (PrimState m) a -> m (Vector a) 
    

    正如你可以看到,該函數也返回與PrimMonad約束單子行動,見the documentation of the primitive package瞭解更多詳情。

    現在,爲了實現自己的目標,據我所知,你會unsafeThawoutter載體,然後concurrently(從asyncunsafeThawread,適用foowrite每個元素和最後unsafeFreeze每個載體,然後unsafeFreezeoutter mutable vector。

    請注意,這也可以通過類似的方式用無箱的可變IO陣列完成。

    另請注意,我從您的問題中推斷出,並行性應限制爲outter向量,即所有行應並行執行,而不是所有行中的所有元素。

    +0

    謝謝,我會放棄這一點。 爲了方便起見,我只是逐行進行並行處理,但我並不介意。 – Will

    +0

    我在需要使用本地C庫的應用程序中使用類似的方法; 「Storable」的特點是任何'Storable'的矢量都可以直接用作C-Array,參見[Foreign.Storable](https://www.stackage.org/haddock/lts-8.0/base -4.9.1.0 /外商Storable.html)。 – sheyll