6
我有一個腳本,用於檢查哪個MX記錄屬於一個電子郵件地址。我有大約30萬封電子郵件要檢查。所以單線程的過程需要很長時間。PHP/Beanstalkd:Spawning多個並行工作者
我已經有一個隊列beanstalkd和PHP通過文件發送電子郵件。不過,我只能讓一名工作人員執行隊列。目前,我正在爲一個過程產生10多名員工而感到茫然。
我運行do_job_mx.php,然後打開一個只包含電子郵件的文件並將它們傳遞給隊列。
PHP代碼採取從文件的電子郵件和放入隊列 - do_job_mx.php:
require_once('pheanstalk_init.php');
$pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300');
$filename = '_blank.txt';
$filename = dirname(__FILE__) . '/in/' . $filename;
foreach (file($filename, FILE_SKIP_EMPTY_LINES) as $line)
{
$json = json_encode(array("email" => trim($line)));
$pheanstalk
->useTube('process_mx')
->put($json);
}
爲職工的PHP代碼 - do_worker_process_mx.php:
class Worker
{
public function __construct()
{
$this->log('worker process - starting');
require_once('pheanstalk_init.php');
$this->pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300');
}
public function __destruct()
{
$this->log('worker process - ending');
}
public function run()
{
$this->log('worker process - starting to run');
while(1)
{
$job = $this->pheanstalk
->watch('process_mx')
->ignore('default')
->reserve();
$data = json_decode($job->getData(), true);
$this->process_mx($data);
$this->pheanstalk->delete($job);
}
}
private function process_mx($data)
{
$domain = explode("@", $data['email']);
dns_get_mx($domain[1], $mx_records);
$mx_array = explode(".", strtolower($mx_records[0]));
$mx = array_slice($mx_array, -2, count($mx_array));
$mx_domain = implode(".", $mx);
echo $data['email'] . "\n";
$this->write_file($mx_domain, $data['email']);
}
private function write_file($mx, $email)
{
$filename = fopen(dirname(__FILE__) . "/out/" . $mx . ".txt", 'ab+');
fwrite($filename, $email . "\n");
fclose($filename);
}
private function log($txt)
{
echo $txt . "\n";
}
}
$worker = new Worker();
$worker->run();
Supervisord的conf :
[program:do_worker_process]
command=/usr/bin/php /srv/www/mydev/public_html/esp/do_worker_process_mx.php
numprocs=10
numprocs_start=10
autostart=true
autorestart=true
stopsignal=QUIT
log_stdout=true
logfile=/var/log/supervisor/worker_process_mx.log
我目前處於癱瘓狀態,要說10+名員工的流程。運行的進程
編號:
# supervisorctl status
do_worker_process RUNNING pid 44343, uptime 1:46:11