2

給定以下代碼,如何確保完成的MyWorker對象被銷燬/釋放內存?PHP pThreads - 你如何執行垃圾收集?

由於我的腳本的作用,我需要約50個線程不斷從cURL獲取數據並對其進行處理。

我已經嘗試了線程永不離開run(),或者如這個示例代碼所示,他們離開運行並使收集功能產生它們的新副本。

但是無論我在一分鐘左右的時間內遇到了什麼內存限制,你能告訴我我做錯了什麼嗎?

class MyWorker extends Threaded 
{ 
    public $complete; 
    public function __construct() {$this->complete = false;} 
    public function run() {$this->complete = true;} 
} 

$pool = new Pool(50); 
for($i=0; $i<50; $i++) 
    $pool->submit(new MyWorker()); 
$pool->collect(function($worker) 
{ 
    global $pool; 
    if($worker->complete == true) 
     $pool->submit(new MyWorker()); 
    return $worker->complete; 
}); 
$pool->shutdown(); 

回答

8

爲什麼

我爲什麼要收集呢?

Worker pthreads提供的線程要求程序員保留對正在執行的Threaded對象的正確引用。程序員難以在用戶區可靠地實現,所以pthreads提供了抽象Workers,它爲您維護引用。

爲了維護這些引用,pthread需要知道對象什麼時候是垃圾,它提供了用於此目的的Pool::collect接口。 Pool::collect需要一個Closure,它應該接受Threaded對象,並返回布爾型true如果傳遞的對象完成執行。

如何

眼前的任務...

爲了保持提交執行任務不排氣資源,則必須重新提交創建完成的任務隊列爲Pool

以下代碼演示了這樣做的一種理智方式:

<?php 

define("LOG", Mutex::create()); 
/* thread safe log to stdout */ 
function slog($message, $args = []) { 
    $args = func_get_args(); 
    if (($message = array_shift($args))) { 
     Mutex::lock(LOG); 
     echo vsprintf(
      "{$message}\n", $args); 
     Mutex::unlock(LOG); 
    } 
} 

class Request extends Threaded { 
    public function __construct($url) { 
     $this->url = $url; 
    } 

    public function run() { 
     $response = @file_get_contents($this->url); 

     slog("%s returned %d bytes", 
      $this->url, strlen($response)); 

     $this->reQueue(); 
    } 

    public function getURL()  { return $this->url; } 

    public function isQueued()  { return $this->queued; } 
    public function reQueue()  { $this->queued = true; } 

    protected $url; 
    protected $queued = false; 
} 

/* create a pool of 50 threads */ 
$pool = new Pool(50); 

/* submit 50 requests for execution */ 
while (@$i++<50) { 
    $pool->submit(new Request(sprintf(
     "http://google.com/?q=%s", md5($i)))); 
} 

do { 
    $queue = array(); 

    $pool->collect(function($request) use ($pool, &$queue) { 
     /* check for items to requeue */ 
     if ($request->isQueued()) { 
      /* get the url for the request, insert into queue */ 
      $queue[] = 
       $request->getURL(); 
      /* allow this job to be collected */ 
      return true; 
     } 
    }); 

    /* resubmit completed tasks to pool */ 
    if (count($queue)) { 
     foreach ($queue as $queued) 
      $pool->submit(new Request($queued)); 
    } 

    /* sleep for a couple of seconds here ... because, be nice ! */ 
    usleep(2.5 * 1000000); 
} while (true); 
?>