2011-12-07 104 views
3

我想創建一個具有多處理功能的小Perl程序。由於我的要求在這裏和那裏有一些小的變化,我無法在任何地方找到任何類似的示例腳本。在Perl中分叉多個孩子並使用管道進行雙向通信

我需要閱讀從STDIN一個大的日誌文件,並給線到第一子進程的第N個(也是一個巨大的數字),然後到第二個子進程的下一N多等我也有一個恆定定義哪個是允許併發運行的子進程的最大數量。一旦達到最大數量的孩子,父母會等待孩子完成工作,並給予另外N個線路。

父進程還收集每個子進程返回的多行(5-10行)輸出,當它們完成並將其存儲在數組中時。 Parent然後繼續處理該數組內容並最終顯示結果。

有沒有更好的示例腳本,我可以修改和使用,或有人可以幫助我在這裏分享一個嗎?我更喜歡只使用管道進行過程互通,並儘可能簡化事情。

編輯: 有人可以演示如何使用IO :: Handle模塊中的管道完成這個任務嗎?

回答

4

使用Forks::Super,這可以很容易地抑制同時進程的數量並處理進程間通信。例如,

use Forks::Super MAX_PROC => 10,  # allow 10 simultaneous processes 
       ON_BUSY => 'queue'; # don't block when >=10 jobs are active 

@loglines = <>; 

# set up all the background jobs 
while (@loglines > 0) { 
    $pid = fork { 
     args => [ splice @loglines, 0, $N ], # to pass to sub, below 
     child_fh => "out", # make child STDOUT readable by parent 
     sub => sub { 
      my @loglines = @_; 
      my @result = ... do something with loglines ... 
      print @results; # use $pid->read_stdout() to read in child 
     } 
    }; 
} 

# get the results 
while ($pid = waitpid -1, 0) { 
    last if $pid == -1; 
    my @results_from_job = $pid->read_stdout(); 
    push @results, @results_from_job; 
} 
+0

看起來像我真正想要的。但是,我只在有限數量的Perl模塊可用的受限環境中運行此腳本。我們可以單獨使用IO :: Handle並實現相同嗎? – meharo

-1

我發現線程對於這類過程要簡單得多。你需要線程和線程::隊列模塊。該過程是設置一個隊列來提供工作線程,併爲他們返回結果。工作者線程只是一個讀取記錄,處理它並將結果發回的函數。我只是把這段代碼放在一起,並沒有測試它,所以它可能是越野車,但我認爲顯示了大致的想法:

use threads(); 
use Thread::Queue; 
# 
# 
#   Set limit on number of workers 
# 
my $MAX_THREADS = 5; 
my $EOD = "\n\n"; 
# 
# 
#   Need a queue to feed the workers 
#   and one for them to return results 
# 
my $Qresult = Thread::Queue->new(); 
my $rec; 
my $n; 
# 
# 
#   load STDIN into the input queue 
# 
my $Qin = Thread::Queue->new(<>); 
# 
# 
#   start worker threads 
# 
for($n = 0; $n < $MAX_THREADS; ++$n) 
{ 
    async{ProcessRecord($n);}; 
    $Qin->enqueue($EOD);   # need terminator for each 
} 
# 
# 
# 
#   Wait for the results to come in 
# 
$n = 0; 
while($n < $MAX_THREADS) 
{ 
      $rec = $q->dequeue(); 
      if($rec eq $EOD) 
      { 
       ++$n; 
       next; 
      } 

      : 
      : 
      : 
    #-- process result --# 
      : 
      : 
      : 


    threads->yield(); # let other threads get a chance 
    sleep 1; 
} 
exit;  
######################################  
# 
# 
#   Worker threads draw from the queue 
#   when a "terminator" is read, quit; 
# 
sub ProcessRecord 
{ 
    my $rec; 
    my $result; 

    while(1) 
    { 
     $rec = $Qin->dequeue(); 
     last if $rec eq $EOD; 
      : 
      : 
      : 
     #-- process record --# 
      : 
      : 
      : 
     $Qresult->enqueue($result); 

     threads->yield(); # let other threads get a chance 
    } 

    threads->exit();  
}