2015-03-19 62 views
6

我開始學習藥劑,並遇到了一個難以解決的挑戰。Enumerable/Stream展望未來

我試圖創建需要Enumerable.t並返回另一個Enumerable.t,其中包括下一個n項目的函數。它與Enum.chunk(e,n,1,[])的行爲稍有不同,因爲數字迭代計數總是等於原始可枚舉計數。我還需要支持流

@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t 

這是最好的文檔測試語法內所示:

iex> lookahead(1..6, 1) |> Enum.to_list 
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]] 

iex> lookahead(1..4, 2) |> Enum.to_list 
[[1,2,3],[2,3,4],[3,4],[4]] 

iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5) 
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]] 

iex> {:ok,io} = StringIO.open("abcd") 
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list 
[["a","b","c"],["b","c","d"],["c","d"],["d"]] 

我已經調查落實Enumerable.t協議,但還沒有完全理解Enumerable.reduce接口。

有沒有簡潔/優雅的方式來做到這一點?

我的用例是針對二進制流中的一個小的固定n值(1或2),因此優化版本的額外點。但是,爲了學習Elixir,我對許多用例的解決方案感興趣。性能很重要。我將在解決方案和發佈的各種n值上運行一些基準。

基準更新 - 2015年4月8日

6級可行的解決方案已經公佈。基準的細節可在https://gist.github.com/spitsw/fce5304ec6941578e454獲得。基準在500個不同的n值列表上運行。

對於n = 1以下結果:

PatrickSuspend.lookahead 104.90 µs/op 
Warren.lookahead   174.00 µs/op 
PatrickChunk.lookahead  310.60 µs/op 
PatrickTransform.lookahead 357.00 µs/op 
Jose.lookahead    647.60 µs/op 
PatrickUnfold.lookahead  1484000.00 µs/op 

對於n = 50以下結果:

PatrickSuspend.lookahead 220.80 µs/op 
Warren.lookahead   320.60 µs/op 
PatrickTransform.lookahead 518.60 µs/op 
Jose.lookahead    1390.00 µs/op 
PatrickChunk.lookahead  3058.00 µs/op 
PatrickUnfold.lookahead  1345000.00 µs/op (faster than n=1) 
+0

我覺得你的最後一個例子應該返回三個元素的列表,對不對? – 2015-03-19 13:39:32

+0

是的,@PatrickOscity,最後一個例子應該返回3個元素。我現在糾正了這個例子。 – Warren 2015-03-19 21:39:25

回答

5

正如在評論中所討論的,我的第一次嘗試有一些性能問題,並且不適用於具有副作用的流,例如IO流。我走上深入挖掘流庫的時間,終於想出了這個解決方案:

defmodule MyStream 
    def lookahead(enum, n) do 
    step = fn val, _acc -> {:suspend, val} end 
    next = &Enumerable.reduce(enum, &1, step) 
    &do_lookahead(n, :buffer, [], next, &1, &2) 
    end 

    # stream suspended 
    defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do 
    {:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)} 
    end 

    # stream halted 
    defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do 
    {:halted, acc} 
    end 

    # initial buffering 
    defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do 
    case next.({:cont, []}) do 
     {:suspended, val, next} -> 
     new_state = if length(buf) < n, do: :buffer, else: :emit 
     do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun) 
     {_, _} -> 
     do_lookahead(n, :emit, buf, next, {:cont, acc}, fun) 
    end 
    end 

    # emitting 
    defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do 
    case next.({:cont, []}) do 
     {:suspended, val, next} -> 
     do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun) 
     {_, _} -> 
     do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun) 
    end 
    end 

    # buffer empty, halting 
    defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do 
    {:halted, acc} 
    end 
end 

這可以看第一個令人生畏,但實際上它並不難。我會盡力爲你分解它,但是用這樣一個完整的例子很難。

讓我們從一個更簡單的例子開始:代替無限循環重複給定的值。爲了發射流,我們可以返回一個將累加器和函數作爲參數的函數。爲了發出一個值,我們使用兩個參數調用該函數:要發射的值和累加器。 acc累加器是一個由命令(:cont,:suspend:halt)組成的元組,並告訴我們消費者希望我們做什麼;我們需要返回的結果取決於操作。如果流應該被掛起,我們返回原子:suspended的三元素元組,累加器和枚舉繼續時將調用的函數(有時稱爲「繼續」)。對於:halt命令,我們只需返回{:halted, acc};對於:cont,我們通過執行上述遞歸步驟來發出值。整個事情看起來像這樣:

defmodule MyStream do 
    def repeat(val) do 
    &do_repeat(val, &1, &2) 
    end 

    defp do_repeat(val, {:suspend, acc}, fun) do 
    {:suspended, acc, &do_repeat(val, &1, fun)} 
    end 

    defp do_repeat(_val, {:halt, acc}, _fun) do 
    {:halted, acc} 
    end 

    defp do_repeat(val, {:cont, acc}, fun) do 
    do_repeat(val, fun.(val, acc), fun) 
    end 
end 

現在,這只是謎題的一部分。我們可以發射流,但是我們不處理流入流。再次,爲了解釋如何工作,構建一個更簡單的例子是有意義的。在這裏,我將構建一個函數,該函數接受一個枚舉值,併爲每個值暫停和重新發射。

defmodule MyStream do 
    def passthrough(enum) do 
    step = fn val, _acc -> {:suspend, val} end 
    next = &Enumerable.reduce(enum, &1, step) 
    &do_passthrough(next, &1, &2) 
    end 

    defp do_passthrough(next, {:suspend, acc}, fun) do 
    {:suspended, acc, &do_passthrough(next, &1, fun)} 
    end 

    defp do_passthrough(_next, {:halt, acc}, _fun) do 
    {:halted, acc} 
    end 

    defp do_passthrough(next, {:cont, acc}, fun) do 
    case next.({:cont, []}) do 
     {:suspended, val, next} -> 
     do_passthrough(next, fun.(val, acc), fun) 
     {_, _} -> 
     {:halted, acc} 
    end 
    end 
end 

第一句設置的是被向下傳遞到do_passthrough功能next功能。它用於從傳入流獲取下一個值。內部使用的step函數定義我們暫停流中的每個項目。除了最後一個條款外,其餘部分非常相似。在這裏,我們使用{:cont, []}來調用下一個函數來獲得一個新值並通過case語句處理結果。如果有價值,我們會返回{:suspended, val, next},如果沒有,則流停止,我們將其傳遞給消費者。

我希望澄清一些關於如何在Elixir中手動構建流的問題。不幸的是,有很多需要使用流的樣板文件。如果您現在回到lookahead實施,您會看到只有微小的差異,這是實際上有趣的部分。有兩個附加參數:state,其用於區分:buffer:emit步驟,以及buffer,其在初始緩衝步驟中預先填充有n+1項目。在發射階段,當前的緩衝區被髮射,然後在每次迭代中向左移動。當輸入流停止或我們的流直接停止時,我們完成了。


我在這裏留下我原來的答案以供參考:

下面是一個使用Stream.unfold/2根據您的規格發射值 的真實流的解決方案。這意味着您需要在前兩個示例的末尾添加Enum.to_list到 以獲取實際值。

defmodule MyStream do 
    def lookahead(stream, n) do 
    Stream.unfold split(stream, n+1), fn 
     {[], stream} -> 
     nil 
     {[_ | buf] = current, stream} -> 
     {value, stream} = split(stream, 1) 
     {current, {buf ++ value, stream}} 
    end 
    end 

    defp split(stream, n) do 
    {Enum.take(stream, n), Stream.drop(stream, n)} 
    end 
end 

一般的想法是我們保留前面迭代的buf。在每次迭代中,我們發出當前的buf,從流中取一個值並將其附加到buf的末尾。這一直重複,直到buf是空的。

實施例:

iex> MyStream.lookahead(1..6, 1) |> Enum.to_list 
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] 

iex> MyStream.lookahead(1..4, 2) |> Enum.to_list 
[[1, 2, 3], [2, 3, 4], [3, 4], [4]] 

iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5) 
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]] 
+0

這也是一個很好的解決方案!我認爲,對Stream.drop/1的多次調用也會影響性能,因爲每次向「drop」流添加越來越多的步驟時都會如此。也許基於你的一個解決方案是使用我們可以暫停流的事實。所以你得到你需要的物品並暫停它。 – 2015-03-19 21:50:54

+1

我試着針對IO.stream的上述解決方案。不幸的是IO.streams有一個副作用,在隨後的調用中不會返回相同的項目。所以我認爲拆分功能會導致項目被刪除。我會在問題中添加一個示例。 – Warren 2015-03-20 00:21:45

+0

@JoséValim謝謝你的建議,我會研究這一點並嘗試改進我的答案。在累加器中傳遞流時感覺錯誤。我想這意味着我必須完全手動構建它,或者是否存在幫助我構建低層流構建的函數? – 2015-03-20 05:22:22

1

您應該能夠使用Stream.chunk/4

將看起來像這樣:

defmodule MyMod do 
    def lookahead(enum, amount) do 
    Stream.chunk(enum, amount + 1, 1, []) 
    end 
end 

隨着您的輸入:

iex(2)> MyMod.lookahead(1..6, 1) |> Enum.to_list 
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] 

iex(3)> MyMod.lookahead(1..4, 2) |> Enum.to_list 
[[1, 2, 3], [2, 3, 4], [3, 4]] 

iex(4)> Stream.cycle(1..3) |> MyMod.lookahead(1) |> Enum.take(5) 
[[1, 2], [2, 3], [3, 1], [1, 2], [2, 3]] 
+0

其實,好奇爲什麼第二個例子沒有最後的[4] .... – hahuang65 2015-03-19 06:35:41

+0

嗯這可能不是一個完美的解決方案。我認爲當使用Stream.chunk時,它會消耗Enumerable的其餘部分... – hahuang65 2015-03-19 06:44:58

+0

填充僅在塊中發生一次,而不是多次。所以你確實不會看到最後一個'[4]'。 – 2015-03-19 07:29:47

3

這裏是一個低效率的實現這樣的功能:

defmodule Lookahead do 
    def lookahead(enumerable, n) when n > 0 do 
    enumerable 
    |> Stream.chunk(n + 1, 1, []) 
    |> Stream.flat_map(fn list -> 
     length = length(list) 
     if length < n + 1 do 
      [list|Enum.scan(1..n-1, list, fn _, acc -> Enum.drop(acc, 1) end)] 
     else 
      [list] 
     end 
     end) 
    end 
end 

它建立在@ hahuang65實施的頂部,除了我們使用Stream.flat_map/2檢查每個發射的長度項目,只要我們檢測到發射的物品變短,就加上丟失的物品。

從頭開始的手寫實現會更快,因爲我們不需要在每次迭代時調用length(list)。上面的實現可能沒有問題,但如果n很小。如果n是固定的,你甚至可以明確地在生成的列表上模式匹配。

+0

這似乎運作良好。儘管如此,該函數在某些情況下確實會返回一些額外的空列表([])。例如lookahead([1,2],1)返回[[1,2],[2],[],[]]。我很享受每一個迴應! – Warren 2015-03-20 00:55:12

1

以下解決方案使用Stream.resource和暫停Enumerable.reduce的能力。所有的例子都通過了。

總之,它使用Enumerable.reduce來建立一個列表。然後在每次迭代中暫掛減速器,刪除列表的頭部,並在列表的尾部添加最新的項目。最後,當reducer被完成或停止時,它會產生流的預告片。所有這些都使用Stream.resource進行協調。

如果使用FIFO隊列而不是每個迭代的列表,這將更有效。

請任何簡化,效率或錯誤

def Module 
    def lookahead(enum, n) when n >= 0 do 
    reducer = fn -> Enumerable.reduce(enum, {:cont, {0, []}}, fn 
     item, {c, list} when c < n -> {:cont, {c+1, list ++ [item]}} # Build up the first list 
     item, {c, list} when c == n -> {:suspend, {c+1, list ++ [item]}} # Suspend on first full list 
     item, {c, [_|list]} -> {:suspend, {c, list ++ [item]}} # Remove the first item and emit 
     end) 
    end 

    Stream.resource(reducer, 
     fn 
     {:suspended, {_, list} = acc , fun} -> {[list], fun.({:cont, acc})} 
     {:halted, _} = result -> lookahead_trail(n, result) # Emit the trailing items 
     {:done, _} = result -> lookahead_trail(n, result) # Emit the trailing items 
     end, 
     fn 
     {:suspended, acc, fun} -> fun.({:halt, acc}) # Ensure the reducer is halted after suspend 
     _ -> 
     end) 
    end 

    defp lookahead_trail(n, acc) do 
    case acc do 
     {action, {c, [_|rest]}} when c > n -> {[], {action, {c-1, rest}}} # List already emitted here 
     {action, {c, [_|rest] = list}} -> {[list], {action, {c-1, rest}}} # Emit the next tail item 
     acc -> {:halt, acc } # Finish of the stream 
    end 
    end 
end 
+0

我真的很喜歡你的方法比較簡潔。豎起大拇指! – 2015-03-23 08:23:08

3

I had started a discussion about my proposed Stream.mutate method on the elixir core mailing list,彼得漢密爾頓提出解決這個問題的另一種方式提供反饋。通過使用make_ref to create a globally unique reference,我們可以創建一個填充流並將其與原始可枚舉連接,以在原始流停止後繼續發射。這個過程既可以配合使用Stream.chunk,這意味着我們需要刪除的最後一步不需要的引用:

def lookahead(enum, n) do 
    stop = make_ref 
    enum 
    |> Stream.concat(List.duplicate(stop, n)) 
    |> Stream.chunk(n+1, 1) 
    |> Stream.map(&Enum.reject(&1, fn x -> x == stop end)) 
end 

我覺得這是最漂亮的解決方案還沒有,從一個語法點。或者,我們可以使用Stream.transform手工打造的緩衝,這是相當類似於我前面提出的手動解決方案:

def lookahead(enum, n) do 
    stop = make_ref 
    enum 
    |> Stream.concat(List.duplicate(stop, n+1)) 
    |> Stream.transform([], fn val, acc -> 
    case {val, acc} do 
     {^stop, []}       -> {[] , []   } 
     {^stop, [_|rest] = buf}    -> {[buf], rest   } 
     {val , buf} when length(buf) < n+1 -> {[] , buf ++ [val] } 
     {val , [_|rest] = buf}    -> {[buf], rest ++ [val]} 
    end 
    end) 
end 

我沒有基準這些解決方案,但我想第二個,雖然略顯笨重,應因爲它不必遍歷每個塊,所以執行得更好一點。

順便說一句,第二個解決方案,可以不寫case語句once Elixir allows to use the pin operator in function heads (probably in v1.1.0)

def lookahead(enum, n) do 
    stop = make_ref 
    enum 
    |> Stream.concat(List.duplicate(stop, n+1)) 
    |> Stream.transform([], fn 
    ^stop, []       -> {[] , []   } 
    ^stop, [_|rest] = buf    -> {[buf], rest   } 
    val , buf when length(buf) < n+1 -> {[] , buf ++ [val] } 
    val , [_|rest] = buf    -> {[buf], rest ++ [val]} 
    end) 
end 
+0

兩個非常好的解決方案。我已經添加了基準。 – Warren 2015-04-08 01:38:25

+0

@Warren看到如何以多種不同的方式解決這個問題真的很有趣。感謝您分享您的基準! – 2015-04-08 06:23:03

0

從沃倫汲取靈感後,我做了這個。基本用法:

ex> {peek, enum} = StreamSplit.peek 1..10, 3 
{[1, 2, 3], #Function<57.77324385/2 in Stream.transform/3>} 
iex> Enum.take(enum, 5) 
[1, 2, 3, 4, 5] 

https://hex.pm/packages/stream_split