2013-05-01 39 views
6

我有一個腳本,用於檢查哪個MX記錄屬於一個電子郵件地址。我有大約30萬封電子郵件要檢查。所以單線程的過程需要很長時間。PHP/Beanstalkd:Spawning多個並行工作者

我已經有一個隊列beanstalkd和PHP通過文件發送電子郵件。不過,我只能讓一名工作人員執行隊列。目前,我正在爲一個過程產生10多名員工而感到茫然。

我運行do_job_mx.php,然後打開一個只包含電子郵件的文件並將它們傳遞給隊列。

PHP代碼採取從文件的電子郵件和放入隊列 - do_job_mx.php:

require_once('pheanstalk_init.php'); 

$pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300'); 

$filename = '_blank.txt'; 
$filename = dirname(__FILE__) . '/in/' . $filename; 

foreach (file($filename, FILE_SKIP_EMPTY_LINES) as $line) 
{ 
    $json = json_encode(array("email" => trim($line))); 

    $pheanstalk 
     ->useTube('process_mx') 
     ->put($json); 
} 

爲職工的PHP代碼 - do_worker_process_mx.php:

class Worker 
{ 
    public function __construct() 
    { 
     $this->log('worker process - starting'); 

     require_once('pheanstalk_init.php'); 
     $this->pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300'); 
    } 

    public function __destruct() 
    { 
     $this->log('worker process - ending'); 
    } 

    public function run() 
    { 
     $this->log('worker process - starting to run'); 

     while(1) 
     { 
      $job = $this->pheanstalk 
       ->watch('process_mx') 
       ->ignore('default') 
       ->reserve(); 

      $data = json_decode($job->getData(), true); 

      $this->process_mx($data); 

      $this->pheanstalk->delete($job); 
     } 
    } 

    private function process_mx($data) 
    { 
     $domain = explode("@", $data['email']); 

     dns_get_mx($domain[1], $mx_records); 

     $mx_array = explode(".", strtolower($mx_records[0])); 

     $mx = array_slice($mx_array, -2, count($mx_array)); 

     $mx_domain = implode(".", $mx); 

     echo $data['email'] . "\n"; 

     $this->write_file($mx_domain, $data['email']); 
    }  

    private function write_file($mx, $email) 
    { 
     $filename = fopen(dirname(__FILE__) . "/out/" . $mx . ".txt", 'ab+'); 

     fwrite($filename, $email . "\n"); 

     fclose($filename); 
    } 

    private function log($txt) 
    { 
     echo $txt . "\n"; 
    } 
} 

$worker = new Worker(); 
$worker->run(); 

Supervisord的conf :

[program:do_worker_process] 
command=/usr/bin/php /srv/www/mydev/public_html/esp/do_worker_process_mx.php 
numprocs=10 
numprocs_start=10 
autostart=true 
autorestart=true 
stopsignal=QUIT 
log_stdout=true 
logfile=/var/log/supervisor/worker_process_mx.log 

我目前處於癱瘓狀態,要說10+名員工的流程。運行的進程

編號:

# supervisorctl status 

do_worker_process RUNNING pid 44343, uptime 1:46:11 

回答

0

CentOS 6的隨附:

beanstalkd 1.4.6 主管2.1.8

我只需要升級到3.0的主管。

現在我有多個工作者設施。