2017-08-26 54 views
2

我在php中使用PThreads進行多線程。我已經在Windows上的XAMPP服務器上成功安裝並運行了它。 我有100K記錄在我的數據庫,我想運行parallel.Every螺紋20個線程將調用來自數據庫和流程5K記錄them.Here是我這個PHP的Pthread問題

require('mailscript.php'); 
class My extends Thread{ 

    function __construct() { 
     $this->mailscript = new mailscript(); 
    } 

    function run(){ 
     $this->mailscript->runMailScript(5000); 
    } 
} 

for($i=0;$i<20;$i++){ 
    $pool[] = new My(); 
} 

foreach($pool as $worker){ 
    $worker->start(); 
} 
foreach($pool as $worker){ 
    $worker->join(); 
} 

代碼時,我只運行此代碼它每個線程最多運行約600條記錄。對於PThread中的線程數有任何限制問題。有什麼問題請幫助我

回答

1

hie,這裏是我將如何處理pthread與你的例子與一個集合方法,如果必要的話會被使用。希望這會幫助你。

/*pthreads batches */ 
$batches = array(); 

$nbpool = 20; // cpu 10 cores 

/* job 1 */ 
$list = [/* data1 */]; 
$batches[] = array_chunk($list, 5000); 

/* job 2 */ 
$list2 = [/* data2 */]; 
$batches[] = array_chunk($list2, 10000); 

/*final collected results*/ 
$resultFinal = []; 

/* loop across batches */ 
foreach ($batches as $key => $chunks) { 

    /* for intermediate collection */ 
    $data[$key] = []; 

    /* how many workers */ 
    $workCount = count($chunks); 

    /* set pool job up to max cpu capabilities */ 
    $pool = new Pool($nbpool, Worker::class); 

    /* pool cycling submit */ 
    foreach (range(1, $workCount) as $i) { 
     $chunck = $chunks[$i - 1]; 
     $pool->submit(new processWork($chunck)); 
    } 

    /* on collection cycling */ 
    $collector = function (\Collectable $work) use (&$data) { 

     /* is worker complete ? */ 
     $isGarbage = $work->isGarbage(); 

     /* worker complete */ 
     if ($isGarbage) { 
      $data[$key] = $work->result; 
     } 
     return $isGarbage; 
    }; 
    do { 
     /* collection on pool stack */ 
     $count = $pool->collect($collector); 
     $isComplete = count($data) === $workCount; 
    } while (!$isComplete); 

    /* push stack results */ 
    array_push($resultFinal, $data); 

    /* close pool */ 
    $pool->shutdown(); 
} 

class processWork extends \Threaded implements \Collectable { 

    private $isGarbage; 
    private $process; 
    public $result; 

    public function __construct($process) { 
     $this->process = $process; 
    } 

    public function run() { 
     $workerDone = array(); 
     foreach ($this->process as $k => $el) { 
      /* whatever stuff with $this->process */ 
     } 
     $this->result = $workerDone; 
     $this->isGarbage = true; // yeah, it s done 
    } 

    public function isGarbage(): bool { 
     return $this->isGarbage; 
    } 
}