2016-07-07 47 views
0

我有一個perl腳本,它運行兩個外部程序,一個依賴於另一個,用於一系列數據集。目前,我只爲每個數據集執行一次,通過第一個程序運行它,使用qx收集結果,並使用這些結果運行第二個程序。使用第二個程序的結果將數據添加到輸出文件,每個數據集一個文件。我創建了希望抓住我目前的做法簡單重複的例子:Perl - 並行編程 - 運行兩個外部程序

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 
     system($cmd_2);   } 
} 

由於第一個程序通常完成比第二個快,我認爲這可能是通過持續運行新來加快這一整個交易通過program_1同時查詢program_2也在以前的查詢上運行。加速完成會很棒,因爲目前需要花費很多小時的時間來完成。但是,我不知道如何去做這件事。像Parallel :: ForkManager會有解決方案嗎?或在Perl中使用線程?

現在在我的實際代碼中,我做了一些錯誤處理,併爲program_2設置了一個超時時間 - 我使用fork,exec和$ SIG {ALRM}來做到這一點,但我並不真正知道我在做什麼那些。重要的是我仍然有能力做到這一點,否則program_2可能會卡住或沒有充分報告爲什麼失敗。以下是錯誤處理代碼的樣子。我不認爲它在可重複的例子中應該如此,但至少你會希望看到我想要做的事情。這裏有錯誤處理:

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 

     my $childPid; 
     eval { 
      local $SIG{ALRM} = sub { die "Timed out" }; 
      alarm 10; 
      if ($childPid = fork()) { 
       wait(); 
      } else { 
       exec($cmd_2); 
      } 
      alarm 0; 
     }; 
     if ($? != 0) { 
      my $exitCode = $? >> 8; 
      print "Program_2 exited with error code $exitCode. Retry...\n"; 
     } 
     if ([email protected] =~ /Timed out/) { 
      print "\tProgram_2 timed out. Skipping...\n"; 
      kill 2, $childPid; 
      wait; 
     }; 
    } 
} 

所有幫助表示讚賞。

回答

3

一個解決方案:

use threads; 

use Thread::Queue; # 3.01+ 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my $job1_thread = async { 
     while (my $job = $job1_request_queue->dequeue()) { 
     my $result = job1($job); 
     $job2_request_queue->enqueue($result); 
     } 

     $job2_request_queue->end(); 
    }; 

    my $job2_thread = async { 
     while (my $job = $job2_request_queue->dequeue()) { 
     job2($job); 
     } 
    }; 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for $job1_thread, $job2_thread; 
} 

你甚至可以有任意多個工作/這兩種類型。

use threads; 

use Thread::Queue; # 3.01+ 

use constant NUM_JOB1_WORKERS => 1; 
use constant NUM_JOB2_WORKERS => 3; 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my @job1_threads; 
    for (1..NUM_JOB1_WORKERS) { 
     push @job1_threads, async { 
     while (my $job = $job1_request_queue->dequeue()) { 
      my $result = job1($job); 
      $job2_request_queue->enqueue($result); 
     } 
     }; 
    } 

    my @job2_threads; 
    for (1..NUM_JOB2_WORKERS) { 
     push @job2_threads, async { 
     while (my $job = $job2_request_queue->dequeue()) { 
      job2($job); 
     } 
     }; 
    } 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for @job1_threads; 
    $job2_request_queue->end(); 
    $_->join() for @job2_threads; 
} 

使用IPC::Run,而不是qx添加超時。不需要信號。

+0

嗨池上,謝謝你的幫助。你能解釋一下線程的結束和連接嗎?當我嘗試使用多工人方法時,我得到了「Perl退出時出現活動線程」的錯誤,大部分運行和未加入,並且有一些已完成並未加入。我可以在下面發佈我的最新代碼作爲答案。 – Tsaari

+0

這告訴工人以前沒有更多的工作,然後等待他們完成。否則,程序會過早結束。 – ikegami

+0

修復了我的代碼中的一個錯誤。 ('@ job1_threads'和'@ job2_threads'沒有填充) – ikegami