2015-06-26 52 views
9

我目前在elixir中並行處理列表問題。並行性的原因是我將結果保存到API中,如果我一次全部激發它們,它會得到DDOS並關閉。Elixir lang並行處理列表

下面的代碼應該將拆分SQL查詢的結果並在一個單獨的任務中處理每一行,並且當所有任務完成後,它應該終止。

發生什麼事是第一個任務在發出消息後導致腳本終止。我已經看到了答案,他們把接收函數放在一個函數中,並且該函數一遍又一遍地調用自己,但是我覺得應該有另一種更好的方法來處理這個問題。

results = Enum.chunk(results, 500) 

# Give this process a name 
Process.register(self(), :core) 

# Loop over the chunks making a process for each 
Enum.each results, fn(result) -> 
    task = Task.async(fn -> Person.App.process(result, "Test", "1") end) 
end 

# And listen for messages 
receive do 
    {:hello, msg} -> IO.inspect msg 
    {:world, _} -> "won't match" 
end 

回答

12

當使用Task.async它是最方便的獲得與Task.await結果:

results 
|> Enum.map(fn result -> Task.async(fn -> Person.App.process(result, "Test", "1") end) end) 
|> Enum.map(&Task.await/1) 
|> Enum.each(&IO.inspect/1) 

事實上,如果你爲async結果仍然會發送到進程不await那稱爲async並存儲在其郵箱中,可能導致內存泄漏!如果您的意圖是創建一個Task,您不關心結果,請改用Task.startTask.start_link

+1

當我將Task.async更改爲Task.start_link時,主進程在行完成處理之前退出。在所有任務完成之前,如何讓腳本保持活動狀態? –

+0

我認爲你應該在答案中使用該方法 - 對所有任務(這是Enum.map(&Task.await/1)的作用)繼續使用Task.async和Task.await。該行的結果將是列表中所收集任務的所有結果 - 然後您可以將它們打印出來(答案中爲「Enum.each(&IO.inspect/1)」)或者用它們做其他事情。 –

+0

好酷很適合我!我必須改變任務等待進入超時,儘管有時任務需要10-20秒Enum.map(&Task.await(&1,20000)) –