從熟悉的前面事情開始。
#! /usr/bin/env perl
use strict;
use warnings;
use 5.10.0; # for // (defined-or)
use IO::Handle;
use IO::Select;
use LWP::Simple;
use POSIX qw/ :sys_wait_h /;
use Socket;
全局常量控制程序的執行。
my $DEBUG = 0;
my $EXIT_COMMAND = "<EXIT>";
my $NJOBS = 10;
的URL來檢查插座的工人抵達終點每行一個。對於每個URL,工作人員調用LWP::Simple::head
來確定資源是否可以獲取。然後,工人寫回插座線形式網址的: *狀態*,其中*狀態*或者是"YES"
或"NO"
和代表空格字符。
如果URL是$EXIT_COMMAND
,那麼工作人員立即退出。
sub check_sites {
my($s) = @_;
warn "$0: [$$]: waiting for URL" if $DEBUG;
while (<$s>) {
chomp;
warn "$0: [$$]: got '$_'" if $DEBUG;
exit 0 if $_ eq $EXIT_COMMAND;
print $s "$_: ", (head($_) ? "YES" : "NO"), "\n";
}
die "NOTREACHED";
}
要創建工人,我們首先創建一個socketpair
。父進程將使用一端,每個工人(子)將使用另一端。我們在兩端禁用緩衝並將父節點添加到我們的IO :: Select實例。我們還注意到每個孩子的進程ID,所以我們可以等待所有工作人員完成。
sub create_worker {
my($sel,$kidpid) = @_;
socketpair my $parent, my $kid, AF_UNIX, SOCK_STREAM, PF_UNSPEC
or die "$0: socketpair: $!";
$_->autoflush(1) for $parent, $kid;
my $pid = fork // die "$0: fork: $!";
if ($pid) {
++$kidpid->{$pid};
close $kid or die "$0: close: $!";
$sel->add($parent);
}
else {
close $parent or die "$0: close: $!";
check_sites $kid;
die "NOTREACHED";
}
}
爲了發送URL,父級抓取儘可能多的讀取器,並從作業隊列中提取相同數量的URL。在作業隊列爲空的任何工作人員都會收到退出命令。
請注意,print
將失敗,如果底層的工人已經退出。父母必須忽略SIGPIPE
以防止立即終止。
sub dispatch_jobs {
my($sel,$jobs) = @_;
foreach my $s ($sel->can_write) {
my $url = @$jobs ? shift @$jobs : $EXIT_COMMAND;
warn "$0 [$$]: sending '$url' to fd ", fileno $s if $DEBUG;
print $s $url, "\n" or $sel->remove($s);
}
}
當時間控制到達read_results
,工人已經創建並接受了工作。現在父母使用can_read
等待一個或多個工作人員的結果。定義的結果是當前工作人員的回答,未定義的結果意味着孩子退出並關閉了套接字的另一端。
sub read_results {
my($sel,$results) = @_;
warn "$0 [$$]: waiting for readers" if $DEBUG;
foreach my $s ($sel->can_read) {
warn "$0: [$$]: reading from fd ", fileno $s if $DEBUG;
if (defined(my $result = <$s>)) {
chomp $result;
push @$results, $result;
warn "$0 [$$]: got '$result' from fd ", fileno $s if $DEBUG;
}
else {
warn "$0 [$$]: eof from fd ", fileno $s if $DEBUG;
$sel->remove($s);
}
}
}
爲了收集所有結果,家長必須跟蹤現場工作人員。
sub reap_workers {
my($kidpid) = @_;
while ((my $pid = waitpid -1, WNOHANG) > 0) {
warn "$0: [$$]: reaped $pid" if $DEBUG;
delete $kidpid->{$pid};
}
}
運行池執行上面的子目錄以分派所有URL並返回所有結果。
sub run_pool {
my($n,@jobs) = @_;
my $sel = IO::Select->new;
my %kidpid;
my @results;
create_worker $sel, \%kidpid for 1 .. $n;
local $SIG{PIPE} = "IGNORE"; # writes to dead workers will fail
while (@jobs || keys %kidpid || $sel->handles) {
dispatch_jobs $sel, \@jobs;
read_results $sel, \@results;
reap_workers \%kidpid;
}
warn "$0 [$$]: returning @results" if $DEBUG;
@results;
}
使用的示例主程序
my @jobs = qw(
bogus
http://stackoverflow.com/
http://www.google.com/
http://www.yahoo.com/
);
my @results = run_pool $NJOBS, @jobs;
print $_, "\n" for @results;
輸出是
bogus: NO
http://www.google.com/: YES
http://stackoverflow.com/: YES
http://www.yahoo.com/: YES
自己的崗位重複[如何判斷一個網頁存在?(http://stackoverflow.com/questions/11594932/how-to-tell-if-a-webpage-exists) – EJP 2012-07-22 03:21:28