正如在評論中所討論的,我的第一次嘗試有一些性能問題,並且不適用於具有副作用的流,例如IO流。我走上深入挖掘流庫的時間,終於想出了這個解決方案:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
這可以看第一個令人生畏,但實際上它並不難。我會盡力爲你分解它,但是用這樣一個完整的例子很難。
讓我們從一個更簡單的例子開始:代替無限循環重複給定的值。爲了發射流,我們可以返回一個將累加器和函數作爲參數的函數。爲了發出一個值,我們使用兩個參數調用該函數:要發射的值和累加器。 acc
累加器是一個由命令(:cont
,:suspend
或:halt
)組成的元組,並告訴我們消費者希望我們做什麼;我們需要返回的結果取決於操作。如果流應該被掛起,我們返回原子:suspended
的三元素元組,累加器和枚舉繼續時將調用的函數(有時稱爲「繼續」)。對於:halt
命令,我們只需返回{:halted, acc}
;對於:cont
,我們通過執行上述遞歸步驟來發出值。整個事情看起來像這樣:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
現在,這只是謎題的一部分。我們可以發射流,但是我們不處理流入流。再次,爲了解釋如何工作,構建一個更簡單的例子是有意義的。在這裏,我將構建一個函數,該函數接受一個枚舉值,併爲每個值暫停和重新發射。
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
第一句設置的是被向下傳遞到do_passthrough
功能next
功能。它用於從傳入流獲取下一個值。內部使用的step函數定義我們暫停流中的每個項目。除了最後一個條款外,其餘部分非常相似。在這裏,我們使用{:cont, []}
來調用下一個函數來獲得一個新值並通過case語句處理結果。如果有價值,我們會返回{:suspended, val, next}
,如果沒有,則流停止,我們將其傳遞給消費者。
我希望澄清一些關於如何在Elixir中手動構建流的問題。不幸的是,有很多需要使用流的樣板文件。如果您現在回到lookahead
實施,您會看到只有微小的差異,這是實際上有趣的部分。有兩個附加參數:state
,其用於區分:buffer
和:emit
步驟,以及buffer
,其在初始緩衝步驟中預先填充有n+1
項目。在發射階段,當前的緩衝區被髮射,然後在每次迭代中向左移動。當輸入流停止或我們的流直接停止時,我們完成了。
我在這裏留下我原來的答案以供參考:
下面是一個使用Stream.unfold/2
根據您的規格發射值 的真實流的解決方案。這意味着您需要在前兩個示例的末尾添加Enum.to_list
到 以獲取實際值。
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
一般的想法是我們保留前面迭代的buf。在每次迭代中,我們發出當前的buf,從流中取一個值並將其附加到buf的末尾。這一直重複,直到buf是空的。
實施例:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
我覺得你的最後一個例子應該返回三個元素的列表,對不對? – 2015-03-19 13:39:32
是的,@PatrickOscity,最後一個例子應該返回3個元素。我現在糾正了這個例子。 – Warren 2015-03-19 21:39:25