我遇到了與Net :: AMQP :: RabbitMQ和fork()的困惑行爲。如果我...發佈到RabbitMQ悄悄地在父進程中失敗後分叉兒童
- 建立在父進程
- 到RabbitMQ的連接發佈的消息
- 叉一個孩子,並等待它退出(孩子睡覺)
- 發佈消息
......第二條消息實際上並沒有發送到RabbitMQ(並且沒有發生錯誤)。我做了很多測試,包括在發送前檢查$connection->is_connected()
。從我的實驗中獲得一些有趣的花絮:
- 如果我在發佈第二條消息之前嘗試打開新通道,則會掛起
$connection->open_channel($newChannelId)
調用。 - 如果我允許孩子在父母發佈第二條消息的同時繼續運行(並且等到之後再到
waitpid
),那麼它已成功發送。
我正在尋找一種方法來檢測此連接不再有效時,分叉的孩子退出,並強制斷開/重新連接。我將連接緩存在系統中各種其他模塊使用的perl模塊中,我不知道是否/當這些其他模塊fork()
並行工作。我無法可靠地設置$SIG{CHLD}
處理程序,並在收到信號時斷開連接,因爲其他代碼可能會覆蓋我的處理程序。我擁有的唯一防彈選項是放棄緩存併爲每封郵件建立連接,但這會顯着降低發佈速度(減少30倍)。
此腳本演示了這個問題(發佈到稱爲話題交換「廣播」):
#!/usr/bin/perl
use strict;
use Net::AMQP::RabbitMQ;
use JSON -support_by_pp;
my $connection;
my $channelId = 0;
sub sendToRabbit {
my ($message) = @_;
print "Sending message $message->{message}\n";
my $contentType = 'application/json';
my $payload = encode_json $message;
$connection->publish($channelId, 'test.route', $payload, { exchange => 'broadcast', force_utf8_in_header_strings => 1 }, { 'content_type' => $contentType });
print "Sent!\n";
}
sub main {
print "Starting...\n";
$connection = Net::AMQP::RabbitMQ->new();
$connection->connect('localhost', { user => 'guest', password => 'guest', port => 5672 });
$connection->channel_open(++$channelId);
print "Connected!\n";
# send first message
sendToRabbit({ message => 'body 1' });
# fork child
my $child = fork();
if(!$child) {
# child
sleep(1);
print "child exiting...\n";
exit(0);
}
else {
# parent
waitpid($child, 0);
}
print "parent continuing...\n";
# send second message - this will not be actually sent.
sendToRabbit({ message => 'body 2' });
# allow I/O to settle...
sleep(1);
}
main();
編輯:溶液
由於池上有關解決方案的謎底!
在我的RabbitMQ管理對象中,我注入了一些代碼到connect()
例程中,允許我有選擇地跳過分叉子的子句,這些子類本身不會調用connect()
。這似乎具有預期的效果。
# Connect to RabbitMQ and create a channel
sub connect {
my ($self) = @_;
$self->{pid} = $$;
# if we redefined the destructor and connect is called, we need to revert
# it so it can be redefined again properly
no warnings qw(redefine);
if($self->{original_destructor}) {
# reset original destructor
*Net::AMQP::RabbitMQ::DESTROY = $self->{original_destructor};
delete $self->{original_destructor};
}
# define alternate destructor so forked children that do not call "connect" do
# not destroy our connection
{
$self->debug("Overridding constructor...");
$self->{original_destructor} = Net::AMQP::RabbitMQ->can('DESTROY');
# only destroy the connection if the current pid is the owner's pid
my $new_destructor = sub { if($self->{pid} eq $$) { $self->debug("Destroying $_[0]!\n"); $self->{original_destructor}->(); } };
*Net::AMQP::RabbitMQ::DESTROY = $new_destructor;
}
my $connection = Net::AMQP::RabbitMQ->new();
$connection->connect('localhost', { user => $self->{username}, password => $self->{password}, port => $PORT, vhost => $VHOST });
$self->{connection} = $connection;
$self->{channel} = $self->createChannel();
1;
}
謝謝你的快速和詳細的答案...我想這是這樣的東西,但不知道如何驗證它(謝謝你的片段)。 +1。 – Voluntari