2012-05-07 86 views
7

我的Perl腳本,需要同時運行多個線程...如何在Perl中實現信號量線程通信?

use threads ('yield', 'exit' => 'threads_only'); 
use threads::shared; 
use strict; 
use warnings; 
no warnings 'threads'; 
use LWP::UserAgent; 
use HTTP::Request; 
use HTTP::Async; 
use ... 

...和這樣的線程需要獲得來自網絡的一些信息,所以HTTP::Async使用。

my $request = HTTP::Request->new; 
    $request->protocol('HTTP/1.1'); 
    $request->method('GET'); 
    $request->header('User-Agent' => '...'); 

my $async = HTTP::Async->new(slots   => 100, 
           timeout   => REQUEST_TIMEOUT, 
           max_request_time => REQUEST_TIMEOUT); 

但一些線程需要,只有當其他線程(S)是這麼說的來訪問網絡。

my $start = [Time::HiRes::gettimeofday()]; 
my @threads =(); 
foreach ... { 
    $thread = threads->create(
    sub { 
      local $SIG{KILL} = sub { threads->exit }; 
      my $url = shift; 
      if ($url ...) { 
      # wait for "go" signal from other threads 
      } 
      my ($response, $data); 
      $request->url($url); 
      $data = ''; 
      $async->add($request); 
      while ($response = $async->wait_for_next_response) { 
      threads->yield(); 
      $data .= $response->as_string; 
      } 
      if ($data ...) { 
      # send "go" signal to waiting threads 
      } 
     } 
     }, $_); 

    if (defined $thread) { 
    $thread->detach; 
    push (@threads, $thread); 
    } 
} 

有可能是一個或多個線程等待「走出去」的信號,有可能是一個或多個線程,這種「走出去」的信號可以發送。一開始信號量的狀態是「等待」,一旦變爲「」,它將保持如此。

最後,應用程序會檢查最大運行時間。如果線程運行時間過長,則發送自終止信號。

my $running; 
do { 
    $running = 0; 
    foreach my $thread (@threads) { 
    $running++ if $thread->is_running(); 
    } 
    threads->yield(); 
} until (($running == 0) || 
     (Time::HiRes::tv_interval($start) > MAX_RUN_TIME)); 
$running = 0; 
foreach my $thread (@threads) { 
    if ($thread->is_running()) { 
    $thread->kill('KILL'); 
    $running++; 
    } 
} 
threads->yield(); 

現在的地步。我的問題是:

  1. 我怎樣才能最有效地碼在腳本等待「信號」(見上腳本註釋)。我應該只是簡單地使用一些虛擬共享變量sleep循環?

  2. 我需要在應用程序的末尾添加一些sleep循環來給時間自我毀滅的主題?

+0

難道我理解正確的話,您使用單獨的HTTP ::異步對象(複製,不共享,通過新的線程)來獲取最多一個網址每次線程一次? – pilcrow

+0

@pilcrow - 是的,它看起來像。浪費資源嗎? –

+0

它可能會也可能不是更少的內存或時間效率,但它是真正的消耗程序員循環。 :)設計難以理解,因此可能安全地更改/擴展,因爲這些組件看起來不太正確。 – pilcrow

回答

3

你可以看看Thread::Queue來完成這項工作。你可以設置一個隊列來處理等待'去'信號的線程和發送'去'信號的線程之間的信號。這裏有一個快速實物模型,我還沒有測試:

... 
use Thread::Queue; 
... 
# In main body 
my $q = Thread::Queue->new(); 
... 
$thread = threads->create(
    sub { 
      local $SIG{KILL} = sub { threads->exit }; 
      my $url = shift; 
      if ($url ...) { 
      # wait for "go" signal from other threads 
      my $mesg = $q->dequeue(); 
      # you could put in some termination code if the $mesg isn't 'go' 
      if ($mesg ne 'go') { ... } 
      } 
      ... 
      if ($data ...) { 
      # send "go" signal to waiting threads 
      $q->enqueue('go'); 
      } 
     } 
     }, $_); 
... 

需要等待一個「走」信號的離隊方法會等到的東西進入隊列中的線程。一旦消息進入隊列一個線程,並且只有一個線程將獲取消息並處理它。

如果您希望停止線程以便它們不會運行,您可以將停止消息插入隊列頭部。

$q->insert(0, 'stop') foreach (@threads); 

有在線程例子::隊列和threads CPAN分佈,在更詳細地示出這一點。

針對你的第二個問題,不幸的是,答案是否依賴。當你繼續終止你的線程時,乾淨關閉需要什麼樣的清理?如果地毯從線下拉出,可能發生的最壞情況是什麼?你會想要在任何時候計劃清理。你可以做的另一個選擇是等待每個線程實際完成。

我的評論詢問是否可以刪除detach調用的原因是因爲此方法允許主線程退出並且不在意發生在任何子線程上的情況。相反,如果你刪除了這個調用,並且添加:

$_->join() foreach threads->list(); 

到主塊的末尾,這將要求主應用程序等待每個線程實際完成。

如果您保留detach方法,那麼如果您需要線程執行任何清理操作,則需要在代碼的最後睡眠。當你在一個線程上調用detach時,你告訴Perl的是,當你的主線程退出時,你不關心線程在做什麼。如果主線程退出,並且仍有正在運行的線程已被分離,那麼程序將完成而沒有任何警告。但是,如果您不需要清理,並且您仍然撥打detach,隨時隨地退出。

+0

這個問題有一個開放的賞金值得+50的聲望。請**改進您的答案**。我發現你的帖子很有趣,但是你沒有迴應我發佈的第二個子問題(如果/如何等待**線程自毀) –

+0

@ user1215106我注意到你的代碼中你正在做一個' $ thread->分離;'。通常你用這個來忽略這個線程,如果它完成了,不用擔心。你有沒有理由在這裏,或者它可以被刪除? – Joel

+0

我相信它可以被刪除 –

-1

試試這樣的事情....

#!/usr/bin/perl 

use threads; 
use threads::shared; 

$|=1; 

my ($global):shared; 
my (@threads); 

push(@threads, threads->new(\&mySub,1)); 
push(@threads, threads->new(\&mySub,2)); 
push(@threads, threads->new(\&mySub,3)); 

$i = 0; 

foreach my $myThread(@threads) 

{ 
    my @ReturnData = $myTread->join ; 
    print "Thread $i returned: @ReturnData\n"; 
    $i++; 
} 

sub mySub 
{ 
    my ($threadID) = @_; 

    for(0..1000) 
    { 
     $global++; 
     print "Thread ID: $threadID >> $_ >> GLB: $global\n"; 
     sleep(1); 
    } 
    return($id); 
}