2015-12-14 85 views
5

我使用Symfony2和RabbitMqBundle創建一個將文檔發送到ElasticSearch的工作人員。以逐一比率索引文檔比使用ElasticSearch批量API慢得多。因此,我創建了一個緩衝區,以千組爲單位將文檔刷新到ES。代碼看起來(有點簡化)如下:經過一段時間不活動後在PHP CLI腳本中運行函數

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 
    } 
} 

這一切都很好,但有一個小問題。隊列以不可預知的速率填充消息。有時在5分鐘內有10萬次,有時不會持續數小時。例如,如果有82671個文檔在排隊,則最後的671個文檔在收到另外329個可能需要幾個小時的文檔之前不會被索引。我希望能夠做到以下幾點:

警告:科幻代碼!這顯然是行不通的:

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 
    protected $flushTimer; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 

     // Highly Sci-fi code 
     $this->flushTimer = new Timer(); 
     // Flush buffer after 5 minutes of inactivity. 
     $this->flushTimer->setTimeout(5 * 60); 
     $this->flushTimer->setCallback([$this, 'flush']); 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } else { 
      // Start a timer that will flush the buffer after a timeout. 
      $this->initTimer(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 

     // There are no longer messages to be send, stop the timer. 
     $this->flushTimer->stop(); 
    } 

    protected function initTimer() 
    { 
     // Start or restart timer 
     $this->flushTimer->isRunning() 
      ? $this->flushTimer->reset() 
      : $this->flushTimer->start(); 
    } 
} 

現在,我知道PHP不是事件驅動的侷限性。但是,這是2015年,有像ReactPHP的解決方案,所以這應該是可能的權利?對於ØMQ,有this function。什麼是解決方案,將爲RabbitMQ工作或獨立於任何消息隊列擴展?

的解決方案,我很懷疑:

  1. crysalead/code。它使用declare(ticks = 1);模擬一個計時器。我不確定這是否是一種高性能和可靠的方法。有任何想法嗎?
  2. 我可以運行一個cronjob,每5分鐘發佈一次'FLUSH'消息到同一個隊列,然後在收到這條消息時明確地刷新緩衝區,但那會是作弊。
+0

不完全是你在找什麼,但可能是一個很好的解決方案。 存儲上次運行'flush'命令的時間,以及添加文檔時還要檢查時間。如果它已經超過5分鐘沖洗。 第二好的選擇是cronjob恕我直言 –

+0

問題是,當你長時間沒有收到任何消息時,你不能檢查時間,因此緩衝區不會被刷新。 cronjob在不同的進程中運行PHP,因此不能訪問緩衝區。 – Xatoo

+0

因此,您在長時間運行的PHP進程中運行該代碼?在這種情況下,你可能會使用信號(就像你的號碼1選項一樣)看看[這裏](http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal )和[這裏](http://www.hackingwithphp。COM/16/1/2 /定時您-信號)。這些信號是非阻塞的,我自己還沒有使用它,但它可能只是你的用例需要的東西。 –

回答

0

正如我在我的評論中提到的,您可以使用這些信號。 PHP允許您向腳本信號註冊信號處理程序(即SIGINT,SIGKILL等)

對於您的用例,您可以使用SIGALRM信號。這個信號會在一段時間(可以設置)過期後報警你的腳本。這些信號的積極方面是它們是非阻塞的。換句話說,腳本的正常操作不會受到干擾。

調整後的解決方案(蜱因爲PHP 5.3不建議使用):

function signal_handler($signal) { 
    // You would flush here 
    print "Caught SIGALRM\n"; 
    // Set the SIGALRM timer again or it won't trigger again 
    pcntl_alarm(300); 
} 

// register your handler with the SIGALRM signal 
pcntl_signal(SIGALRM, "signal_handler", true); 
// set the timeout for the SIGALRM signal to 300 seconds 
pcntl_alarm(300); 

// start loop and check for pending signals 
while(pcntl_signal_dispatch() && your_loop_condition) { 
    //Execute your code here 
} 

注意:你只能在你的腳本中使用1個SIGALRM信號,如果你設置你的信號的時間pcntl_alarm計時器您警報將復位(不觸發信號)到其新設定的值。

+0

是的,和我在我的問題中提到的「crysalead/code」項目一樣。但是,它使用「聲明滴答聲」,我懷疑在每個語句之後中斷PHP執行是否是一個高性能解決方案。你有什麼經驗嗎? – Xatoo

+0

另外,您提供的鏈接指向解釋使用滴答的頁面的鏈接已棄用。大多數提到的蜱都表示使用蜱在大多數情況下是反模式。因此我感興趣是否有其他選擇。 – Xatoo

+0

您的權利,我忽略了表示滴答的部分已被棄用。我做了一些挖掘,發現了一個不推薦的選擇。通過使用'pcntl_signal_dispatch()',您可以確定自己何時檢查待處理的信號i.s.o.每次打勾運行處理程序。調整後的解決方案也會更高性能。希望這有助於。 –

相關問題