我不能評論監督,但我會實施這個作爲與到期項目的隊列。
我已經實現了一些你可以在下面使用的東西。
我把它做成了gen_server;當你創造它時,你給它一箇舊物品的最大年齡。
它的界面是,你可以發送它的項目進行處理,你可以請求沒有出隊的項目它記錄了它接收每個項目的時間。每次接收到要處理的項目時,它都會檢查隊列中的所有項目,並將超出最大年齡的項目出列並捨棄。 (如果您希望始終遵守最大年齡,則可以在返回排隊的項目之前過濾隊列)
您的數據源將數據({process_this, Anything}
)轉換爲工作隊列,並且您的(可能較慢)使用者進程將調用(gimme
)來獲取數據。
-module(work_queue).
-behavior(gen_server).
-export([init/1, handle_cast/2, handle_call/3]).
init(DiscardAfter) ->
{ok, {DiscardAfter, queue:new()}}.
handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
Instant = now(),
Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
Queue2 = queue:in({Instant, Data}, Queue1),
{noreply, {DiscardAfter, Queue2}}.
handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
case queue:is_empty(Queue0) of
true ->
{reply, no_data, State};
false ->
{{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
{reply, {data, Data}, {DiscardAfter, Queue1}}
end.
delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.
too_old(Stamp, Instant, DiscardAfter) ->
delta(Stamp, Instant) > DiscardAfter.
在REPL小演示:
c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),
timer:sleep(11 * 1000),
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.
「工作進程隊列等待數據」,這聽起來不像Erlang。 Erlang的優點之一是產生新進程並在完成工作時終止它們是很便宜的。您可以只計算gen_server中正在運行的工作人員的數量,以避免過載情況。或者更容易:如果他們還在身邊,這些流程是否會自動開槍?然而,多個工作進程只能幫你在SMP機器上工作。 –
優秀點。我最近在大腦上有「等待數據的過程」,但絕對沒有必要在這裏使用它們。你如何看待答案的其餘部分? – ellisbben
我可能不完全瞭解所有內容,但我認爲這解決了另一個問題。我不想承諾固定的DiscardAfter時間間隔。也許另一種說法是,我希望我的處理結果始終反映我所瞭解的最新數據點,而不浪費時間處理過時的數據。但所需的處理量可能會動態變化,並且源的數據速率也可能會有所不同。 – b0fh