2016-01-18 39 views
3

我正在實現分佈式cronjob執行系統(所謂的cron計算集羣)。當動作時間在那裏時,Cronjobs應該排隊進入消息隊列(RabbitMQ)。另一方面(集羣的節點/工作者)是一個Perl守護進程,它利用AnyEvent::RabbitMQ從消息隊列中完全接收一個cronjob /任務/消息,處理任務並從消息隊列請求另一個恰好一個cronjob /任務/消息等等。Howto如何處理Perl中的AnyEvent,RabbitMQ(心跳)和長時間運行的作業?

我使用RabbitMQ的心跳功能,該功能與AnyEvent::RabbitMQ一起實現,以幫助RabbitMQ識別斷開的連接。

沒關係心跳間隔的實際值!我也有很長時間的運行工作需要幾天時間。因此,將時間間隔設置爲最長的cronjob所需的時間並不是一種選擇。

請參閱下面的代碼片段來執行Perl守護進程worker中的實際cronjob。它在'AnyEvent-> timer'內實現,不會對RabbitMQ做消息處理。由於RabbitMQ的consume被禁止(由管理層)使用此方法。

sub _timer_tick { 

    $rabbitmq_channel->get(
    queue  => 'job_queue', 
    on_success => sub { 
     my ($amqp_method) = @_; 
     if (not $amqp_method->{empty}) { 
     pause_timer(); 
     progress_job($amqp_method); 
     resume_timer(); 
     } 
    }, 
    on_failure => sub { $quit_programm->send('RABBITMQ_ERROR', @_) }, 
); 

    return; 
} 

progress_job()是消息被解析並執行作業的地方。 pause_timer()resume_timer()控制觸發_timer_tick()AnyEvent->timer

use Capture::Tiny 'capture'; 
sub progress_job { 
    my ($amqp_method) = @_; 
    my $job = decode_json($amqp_method->{body}->to_raw_payload()); 
    my ($stdout, $stderr, $exit) = capture { 
    system $job->{execute}; 
    }; 
    return; 
} 

第一個長期運行的工作中去,系統「崩潰」與各種錯誤消息。有時會拋出'未知頻道ID:1',有時會拋出'頻道已關閉'。所以我做了'愚蠢的調試'(試圖弄亂配置),發現當heartbeat的間隔比progress_job()所用的時間短時,將會拋出這些錯誤。經過一番考慮後纔有意義。 progress_job()是一個阻塞子程序,AnyEvent無法繼續向RabbitMQ發送心跳包。

我首先想到的解決阻塞問題的方法是在子進程中分叉並執行progress_job()AnyEvents documentation on FORK指出,如果孩子內沒有對事件系統的訪問(例如通過AnyEvent),則使用fork保存。 接下來想到:好吧,沒有訪問事件系統,所以我可以做叉子。 但是:定時器應該恢復(resume_timer())後progress_job()已返回。理論上resume_timer()將在fork()之後被調用,而不是在progress_job()之後返回。所以我停止了我的實施。

我的問題:如何解決最後一點? progress_job()(或換句話說,分叉的孩子)返回後如何resume_timer()? 由於分叉和事件系統不是線程安全的,我不能將resume_timer()放在孩子的內部。

回答

3

AE不能處理事件,除非程序使用AE感知呼叫阻塞。 system不是AE感知的。改爲使用AnyEvent::Utilrun_cmd

+0

'$ run_cmd_cv-> recv'給了我'256'而不是預期的'1'。命令'perl -E'exit 1'; echo $?'也回聲'1'。如何接收執行命令的實際錯誤代碼? – burnersk

+1

bash的'$?'不同於Perl的'$?'。參見['system'](http://perldoc.perl.org/functions/system.html)。 – ikegami

+0

謝謝你指出。我添加了'$ exit = $ exit >> 8,如果$ exit && $ exit> 255;'現在它可以工作。 – burnersk

相關問題