2011-08-15 30 views
3

我有一個數據源可以產生潛在的高速率點,並且我想對每個點執行可能耗時的操作;但我也希望當系統過載時通過丟棄多餘的數據點來優雅地降級。erlang/OTP中限速事件處理程序

據我所知,使用gen_event不會跳過事件。從概念上講,我希望gen_event要做的是在再次運行處理程序之前刪除除最新未決事件外的所有事件。

有沒有辦法用標準的OTP做到這一點?還是有一個很好的理由爲什麼我不應該這樣處理事情?

到目前爲止,我有使用gen_server並依託超時觸發事件昂貴最佳:

-behaviour(gen_server). 
init() -> 
    {ok, Pid} = gen_event:start_link(), 
    {ok, {Pid, none}}. 

handle_call({add, H, A},_From,{Pid,Data}) -> 
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}. 

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}. % set timeout to 0 

handle_info(timeout, {Pid,Data}) -> 
    gen_event:sync_notify(Pid,Data), 
    {noreply, {Pid,Data}}. 

是這種做法是否正確? (尤其是監督?)

回答

0

有沒有辦法與標準的OTP做到這一點?

是有一個很好的理由,爲什麼我不應該處理的事情呀?

不,提前超時可以提高整個系統的性能。請閱讀如何here

此方法是否正確? (特別是在監督方面?)

不知道,你還沒有提供監督代碼。


由於一些額外的信息,你的第一個問題:

如果您可以使用OTP之外的第三方庫,有幾個在那裏,可以添加搶佔超時,這是你是什麼描述。

有兩個我熟悉的第一個是dispcount,第二個是chick(我是小雞的作者,我會盡量不要在這裏宣傳這個項目)。

Dispcount對單一資源非常有用,只有有限數量的作業可以同時運行並且不會排隊。你可以閱讀它here警告很多真正有趣的信息!)。

Dispcount並不適用於我,因爲我不得不產生4000多個進程池來處理我的應用程序中不同隊列的數量。我寫小雞是因爲我需要一種動態增加和減少隊列長度的方法,並且能夠排隊請求並拒絕其他隊列,而不必產生4000多個進程池。

如果我是你,我會先嚐試折扣(因爲大多數解決方案不需要小雞),然後如果你需要更多動態的東西,那麼一個池可以響應一定數量的請求嘗試小雞。

1

我不能評論監督,但我會實施這個作爲與到期項目的隊列。

我已經實現了一些你可以在下面使用的東西。

我把它做成了gen_server;當你創造它時,你給它一箇舊物品的最大年齡。

它的界面是,你可以發送它的項目進行處理,你可以請求沒有出隊的項目它記錄了它接收每個項目的時間。每次接收到要處理的項目時,它都會檢查隊列中的所有項目,並將超出最大年齡的項目出列並捨棄。 (如果您希望始終遵守最大年齡,則可以在返回排隊的項目之前過濾隊列)

您的數據源將數據({process_this, Anything})轉換爲工作隊列,並且您的(可能較慢)使用者進程將調用(gimme)來獲取數據。

-module(work_queue). 
-behavior(gen_server). 

-export([init/1, handle_cast/2, handle_call/3]). 

init(DiscardAfter) -> 
    {ok, {DiscardAfter, queue:new()}}. 

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) -> 
    Instant = now(), 
    Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0), 
    Queue2 = queue:in({Instant, Data}, Queue1), 
    {noreply, {DiscardAfter, Queue2}}. 

handle_call(gimme, From, State = {DiscardAfter, Queue0}) -> 
    case queue:is_empty(Queue0) of 
    true -> 
     {reply, no_data, State}; 
    false -> 
     {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0), 
     {reply, {data, Data}, {DiscardAfter, Queue1}} 
    end. 

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) -> 
    ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1. 

too_old(Stamp, Instant, DiscardAfter) -> 
    delta(Stamp, Instant) > DiscardAfter. 

在REPL小演示:

c(work_queue). 
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).   
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),  
timer:sleep(11 * 1000),             
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}), 
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.   
+2

「工作進程隊列等待數據」,這聽起來不像Erlang。 Erlang的優點之一是產生新進程並在完成工作時終止它們是很便宜的。您可以只計算gen_server中正在運行的工作人員的數量,以避免過載情況。或者更容易:如果他們還在身邊,這些流程是否會自動開槍?然而,多個工作進程只能幫你在SMP機器上工作。 –

+0

優秀點。我最近在大腦上有「等待數據的過程」,但絕對沒有必要在這裏使用它們。你如何看待答案的其餘部分? – ellisbben

+0

我可能不完全瞭解所有內容,但我認爲這解決了另一個問題。我不想承諾固定的DiscardAfter時間間隔。也許另一種說法是,我希望我的處理結果始終反映我所瞭解的最新數據點,而不浪費時間處理過時的數據。但所需的處理量可能會動態變化,並且源的數據速率也可能會有所不同。 – b0fh