我想寫一個進程,偵聽ActiveMQ並基於消息,出去並從Web服務抓取數據,做一些處理,然後將流程數據放到另一個Web服務中。 (REST/JSON)AnyEvent :: STOMP :: Client + AnyEvent :: ForkManger =間歇性錯誤
下面的模塊正常工作,直到我跟其中一個不知名的webservices返回錯誤。我已經嘗試了很多東西來捕捉錯誤,但無濟於事。一旦Web服務發生錯誤,雖然我得到以下信息:事件
未處理的異常回調(MESSAGE, AnyEvent :: STOMP ::客戶端= HASH(0x3ad5e48),HASH(0x3a6bbb0) { 「行動」: 「創造」, 「數據」:{ 「ID」:40578737, 「類型」: 「警報」, 「誰」:空}, 「GUID」: 「ADCCEE0C-73A7-11E6-8084-74B346D1CA67」, 「主機名」: 「MYSERVER」, 「PID」:48632}): $ fork_manager-> start()方法應該在管理器進程中調用
OK,我在概念理解,子進程正在嘗試啓動另一個進程和叉子經理說這是一個不行。但是,考慮到下面的模塊,啓動新進程來處理長時間運行的處理的正確方法是什麼。或者爲什麼一個子進程死亡造成這種例外,我怎樣才能防止這種
這裏的模塊(精簡)
package consumer;
use AnyEvent::ForkManager;
use AnyEvent::STOMP::Client;
use JSON;
use Data::Dumper;
use v5.18;
use Moose;
sub run {
my $self = shift;
my $pm = AnyEvent::ForkManager->new(max_workers => 20);
my $stomp = AnyEvent::STOMP::Client->new();
$stomp->connect();
$stomp->on_connected(sub {
my $stomp = shift;
$stomp->subscribe('/topic/test');
say "Connected to STOMP";
});
$pm->on_start(sub {
my ($pm,$pid,@params) = @_;
say "Starting $pid worker";
});
$pm->on_finish(sub {
my ($pm, $pid,@params) = @_;
say "Finished $pid worker";
});
$pm->on_error(sub {
say Dumper(\@_);
});
$stomp->on_message(sub {
my ($stomp, $header, $body) = @_;
my $href = decode_json $body;
$pm->start(cb => sub {
my ($pm, @params) = @_;
$self->process(@params);
},
args => [ $href->{id}, $href->{data}->{type}, $href->{data}->{who} ],
);
});
my $cv = AnyEvent->condvar;
$cv->recv;
}
sub process {
say "Processing ".Dumper(\@_);
sleep 5;
if (int(rand(10)) < 5) {
die "OOPS"; # this triggers the error message above
}
say "Done Processing $_[1]";
}
1;
繼承人以上模塊的驅動程序:
#!/usr/bin/env perl
use v5.18;
use lib '.';
use consumer;
my $c = consumer->new();
$c->run;
最後一個流量發生器,你可以用它來看到這個行動:
#!/usr/bin/env perl
use lib '../lib';
use lib '../../lib';
use v5.18;
use Data::Dumper;
use JSON;
use Net::STOMP::Client;
$ENV{'scot_mode'} = "testing";
my $stomp = Net::STOMP::Client->new(
host => "127.0.0.1",
port => 61613
);
$stomp->connect();
for (my $i = 1; $i < 1000000; $i++) {
my $href = {
id => $i,
type => "event",
what => "foo",
};
my $json = encode_json $href;
say "Sending ".Dumper($href);
$stomp->send(
destination => "/topic/test",
body => $json,
);
}
$stomp->disconnect();