2015-09-15 67 views
11

我想在大列表上做一個並行地圖​​。代碼看起來有點像這樣:Elixir Stream中的Task.async

big_list 
|> Stream.map(&Task.async(Module, :do_something, [&1])) 
|> Stream.map(&Task.await(&1)) 
|> Enum.filter filter_fun 

但我檢查流實現而據我瞭解Stream.map結合了功能和應用組合功能的流,這將意味着順序是這樣的內容:

  1. 以第一要素
  2. 創建異步任務
  3. 等待它完成
  4. 採取第二elelemnt ...

在這種情況下,它不會做並行。我是對的還是缺少什​​麼?

如果我是正確的,那這個代碼?

Stream.map Task.async ... 
|> Enum.map Task.await ... 

這是否會平行運行?

+2

閱讀本 - http://www.theerlangelist.com/2015/07/beyond-taskasync.html – emaillenin

回答

9

第二個也不會做你想做的。您可以使用此代碼看得很清楚:

defmodule Test do 
    def test do 
    [1,2,3] 
    |> Stream.map(&Task.async(Test, :job, [&1])) 
    |> Enum.map(&Task.await(&1)) 
    end 

    def job(number) do 
    :timer.sleep 1000 
    IO.inspect(number) 
    end 
end 

Test.test 

你會看到一個數字,然後1周秒鐘的等待,另一個號碼,等等。這裏的關鍵是你想盡快創建任務,所以你根本不應該使用 懶惰Stream.map。而是使用急於Enum.map在這一點上:

|> Enum.map(&Task.async(Test, :job, [&1])) 
|> Enum.map(&Task.await(&1)) 

在另一方面等待當你做一些急於操作後,像你filter可以使用Stream.map,只要。這樣,等待將會穿插任何你可能對結果進行的處理。

4

藥劑1.4提供了新的Task.async_stream/5函數將返回在可枚舉的每一個項目同時運行一個給定函數的流。

還可以使用:max_concurrency:timeout選項參數指定工作人員的最大數量和超時時間。


這會讓你的例子同時運行:

big_list 
|> Task.async_stream(Module, :do_something, [&1]) 
|> Enum.filter(filter_fun) 
0

你可以試試Parallel Stream

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end) 
stream |> Enum.into([]) 
[2,4,6,8,10,12,14,16,18,20] 

UPD 或者更好地利用Flow