我使用Symfony2和RabbitMqBundle創建一個將文檔發送到ElasticSearch的工作人員。以逐一比率索引文檔比使用ElasticSearch批量API慢得多。因此,我創建了一個緩衝區,以千組爲單位將文檔刷新到ES。代碼看起來(有點簡化)如下:經過一段時間不活動後在PHP CLI腳本中運行函數
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
}
}
這一切都很好,但有一個小問題。隊列以不可預知的速率填充消息。有時在5分鐘內有10萬次,有時不會持續數小時。例如,如果有82671個文檔在排隊,則最後的671個文檔在收到另外329個可能需要幾個小時的文檔之前不會被索引。我希望能夠做到以下幾點:
警告:科幻代碼!這顯然是行不通的:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
現在,我知道PHP不是事件驅動的侷限性。但是,這是2015年,有像ReactPHP的解決方案,所以這應該是可能的權利?對於ØMQ,有this function。什麼是解決方案,將爲RabbitMQ工作或獨立於任何消息隊列擴展?
的解決方案,我很懷疑:
- 有crysalead/code。它使用
declare(ticks = 1);
模擬一個計時器。我不確定這是否是一種高性能和可靠的方法。有任何想法嗎? - 我可以運行一個cronjob,每5分鐘發佈一次'FLUSH'消息到同一個隊列,然後在收到這條消息時明確地刷新緩衝區,但那會是作弊。
不完全是你在找什麼,但可能是一個很好的解決方案。 存儲上次運行'flush'命令的時間,以及添加文檔時還要檢查時間。如果它已經超過5分鐘沖洗。 第二好的選擇是cronjob恕我直言 –
問題是,當你長時間沒有收到任何消息時,你不能檢查時間,因此緩衝區不會被刷新。 cronjob在不同的進程中運行PHP,因此不能訪問緩衝區。 – Xatoo
因此,您在長時間運行的PHP進程中運行該代碼?在這種情況下,你可能會使用信號(就像你的號碼1選項一樣)看看[這裏](http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal )和[這裏](http://www.hackingwithphp。COM/16/1/2 /定時您-信號)。這些信號是非阻塞的,我自己還沒有使用它,但它可能只是你的用例需要的東西。 –