2017-02-14 41 views
1

我遇到了與Net :: AMQP :: RabbitMQ和fork()的困惑行爲。如果我...發佈到RabbitMQ悄悄地在父進程中失敗後分叉兒童

  1. 建立在父進程
  2. 到RabbitMQ的連接發佈的消息
  3. 叉一個孩子,並等待它退出(孩子睡覺)
  4. 發佈消息

......第二條消息實際上並沒有發送到RabbitMQ(並且沒有發生錯誤)。我做了很多測試,包括在發送前檢查$connection->is_connected()。從我的實驗中獲得一些有趣的花絮:

  • 如果我在發佈第二條消息之前嘗試打開新通道,則會掛起$connection->open_channel($newChannelId)調用。
  • 如果我允許孩子在父母發佈第二條消息的同時繼續運行(並且等到之後再到waitpid),那麼它已成功發送。

我正在尋找一種方法來檢測此連接不再有效時,分叉的孩子退出,並強制斷開/重新連接。我將連接緩存在系統中各種其他模塊使用的perl模塊中,我不知道是否/當這些其他模塊fork()並行工作。我無法可靠地設置$SIG{CHLD}處理程序,並在收到信號時斷開連接,因爲其他代碼可能會覆蓋我的處理程序。我擁有的唯一防彈選項是放棄緩存併爲每封郵件建立連接,但這會顯着降低發佈速度(減少30倍)。

此腳本演示了這個問題(發佈到稱爲話題交換「廣播」):

#!/usr/bin/perl 
use strict; 
use Net::AMQP::RabbitMQ; 
use JSON -support_by_pp; 

my $connection; 
my $channelId = 0; 

sub sendToRabbit { 
    my ($message) = @_; 
    print "Sending message $message->{message}\n"; 
    my $contentType = 'application/json'; 
    my $payload = encode_json $message; 
    $connection->publish($channelId, 'test.route', $payload, { exchange => 'broadcast', force_utf8_in_header_strings => 1 }, { 'content_type' => $contentType }); 
    print "Sent!\n"; 
} 

sub main { 
    print "Starting...\n"; 

    $connection = Net::AMQP::RabbitMQ->new(); 
    $connection->connect('localhost', { user => 'guest', password => 'guest', port => 5672 }); 
    $connection->channel_open(++$channelId); 
    print "Connected!\n"; 

    # send first message 
    sendToRabbit({ message => 'body 1' }); 

    # fork child 
    my $child = fork(); 
    if(!$child) { 
    # child 
    sleep(1); 
    print "child exiting...\n"; 
    exit(0); 
    } 
    else { 
    # parent 
    waitpid($child, 0); 
    } 
    print "parent continuing...\n"; 

    # send second message - this will not be actually sent. 
    sendToRabbit({ message => 'body 2' }); 

    # allow I/O to settle... 
    sleep(1); 
} 

main(); 

編輯:溶液

由於池上有關解決方案的謎底!

在我的RabbitMQ管理對象中,我注入了一些代碼到connect()例程中,允許我有選擇地跳過分叉子的子句,這些子類本身不會調用connect()。這似乎具有預期的效果。

# Connect to RabbitMQ and create a channel 
sub connect { 
    my ($self) = @_; 

    $self->{pid} = $$; 

    # if we redefined the destructor and connect is called, we need to revert 
    # it so it can be redefined again properly 
    no warnings qw(redefine); 
    if($self->{original_destructor}) { 
    # reset original destructor 
    *Net::AMQP::RabbitMQ::DESTROY = $self->{original_destructor}; 
    delete $self->{original_destructor}; 
    } 

    # define alternate destructor so forked children that do not call "connect" do 
    # not destroy our connection 
    { 
    $self->debug("Overridding constructor..."); 
    $self->{original_destructor} = Net::AMQP::RabbitMQ->can('DESTROY'); 
    # only destroy the connection if the current pid is the owner's pid 
    my $new_destructor = sub { if($self->{pid} eq $$) { $self->debug("Destroying $_[0]!\n"); $self->{original_destructor}->(); } }; 
    *Net::AMQP::RabbitMQ::DESTROY = $new_destructor; 
    } 

    my $connection = Net::AMQP::RabbitMQ->new(); 
    $connection->connect('localhost', { user => $self->{username}, password => $self->{password}, port => $PORT, vhost => $VHOST }); 
    $self->{connection} = $connection; 
    $self->{channel} = $self->createChannel(); 

    1; 
} 

回答

2

孩子是父母的克隆,並且文件處理父母與孩子共享。作爲父母的副本,孩子擁有$connection的副本。當孩子退出時,這個對象被銷燬,調用它的析構函數,向RabbitMQ發送命令關閉連接。

您可以通過添加

{ 
    my $old_destructor = Net::AMQP::RabbitMQ->can('DESTROY'); 
    my $new_destructor = sub { print("Destroying $_[0]!\n"); $old_destructor->(); }; 
    no warnings qw(redefine); 
    *Net::AMQP::RabbitMQ::DESTROY = $new_destructor; 
} 

可能的解決方案看到這一點:

  • 移動子代碼到一個單獨的文件,並exec該文件。
  • 將子代碼移動到使用「secret」參數調用腳本時調用的子代碼中,並讓孩子使用該參數通過exec重新啓動。
  • 更快地創建孩子。具體來說,在創建RabbitMQ連接之前創建它。
  • 創建一個孩子來做RabbitMQ的東西。
  • 使用線程而不是子進程。

PS —不要寫自己fork + exec代碼。至少使用open3

sub spawn { 
    open(local *CHILD_STDIN, '<', '/dev/null') or die $!; 
    return open3('<&CHILD_STDIN', '>&STDOUT', '>&STDERR', @_); 
} 

sendToRabbit({ message => 'body 1' }); 

my $pid = spawn('child.pl');  
waitpid($pid, 0); 

sendToRabbit({ message => 'body 2' }); 
+0

謝謝你的快速和詳細的答案...我想這是這樣的東西,但不知道如何驗證它(謝謝你的片段)。 +1。 – Voluntari