我正在實現分佈式cronjob執行系統(所謂的cron計算集羣)。當動作時間在那裏時,Cronjobs應該排隊進入消息隊列(RabbitMQ)。另一方面(集羣的節點/工作者)是一個Perl守護進程,它利用AnyEvent::RabbitMQ
從消息隊列中完全接收一個cronjob /任務/消息,處理任務並從消息隊列請求另一個恰好一個cronjob /任務/消息等等。Howto如何處理Perl中的AnyEvent,RabbitMQ(心跳)和長時間運行的作業?
我使用RabbitMQ的心跳功能,該功能與AnyEvent::RabbitMQ
一起實現,以幫助RabbitMQ識別斷開的連接。
沒關係心跳間隔的實際值!我也有很長時間的運行工作需要幾天時間。因此,將時間間隔設置爲最長的cronjob所需的時間並不是一種選擇。
請參閱下面的代碼片段來執行Perl守護進程worker中的實際cronjob。它在'AnyEvent-> timer'內實現,不會對RabbitMQ做消息處理。由於RabbitMQ的consume
被禁止(由管理層)使用此方法。
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if (not $amqp_method->{empty}) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send('RABBITMQ_ERROR', @_) },
);
return;
}
progress_job()
是消息被解析並執行作業的地方。 pause_timer()
和resume_timer()
控制觸發_timer_tick()
的AnyEvent->timer
。
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json($amqp_method->{body}->to_raw_payload());
my ($stdout, $stderr, $exit) = capture {
system $job->{execute};
};
return;
}
第一個長期運行的工作中去,系統「崩潰」與各種錯誤消息。有時會拋出'未知頻道ID:1',有時會拋出'頻道已關閉'。所以我做了'愚蠢的調試'(試圖弄亂配置),發現當heartbeat
的間隔比progress_job()
所用的時間短時,將會拋出這些錯誤。經過一番考慮後纔有意義。 progress_job()
是一個阻塞子程序,AnyEvent無法繼續向RabbitMQ發送心跳包。
我首先想到的解決阻塞問題的方法是在子進程中分叉並執行progress_job()
。 AnyEvents documentation on FORK指出,如果孩子內沒有對事件系統的訪問(例如通過AnyEvent),則使用fork
保存。 接下來想到:好吧,沒有訪問事件系統,所以我可以做叉子。 但是:定時器應該恢復(resume_timer()
)後progress_job()
已返回。理論上resume_timer()
將在fork()
之後被調用,而不是在progress_job()
之後返回。所以我停止了我的實施。
我的問題:如何解決最後一點? progress_job()
(或換句話說,分叉的孩子)返回後如何resume_timer()
? 由於分叉和事件系統不是線程安全的,我不能將resume_timer()
放在孩子的內部。
'$ run_cmd_cv-> recv'給了我'256'而不是預期的'1'。命令'perl -E'exit 1'; echo $?'也回聲'1'。如何接收執行命令的實際錯誤代碼? – burnersk
bash的'$?'不同於Perl的'$?'。參見['system'](http://perldoc.perl.org/functions/system.html)。 – ikegami
謝謝你指出。我添加了'$ exit = $ exit >> 8,如果$ exit && $ exit> 255;'現在它可以工作。 – burnersk