我有一個RabbitMQ代理,在這個代理上發佈不同的消息,這些消息最終將作爲Elasticsearch中的文檔。代理中有多個消費者,這些消費者實際上是分配給amqp入站網關的任務執行程序中的不同線程(在此使用spring集成和spring amqp)。排隊機制和Elasticsearch 1.4.0
認爲在以下情形:我已經創建了一個ES與文檔結構
{
"field1" : "value1",
"field2" : "value2"
}
後來我送兩個更新請求,更新都同場,讓我們說field1
。如果我一個接一個地發送這個消息(生產中的常見用例),我的消費者線程將以正確的順序獲取消息(amqp允許這樣做),但是處理可能發生在錯誤的順序中,並且以後更新的值可能是被第一個覆蓋。我最終會得到數據。
如何確保我的數據不會被損壞? =>擁有1個消費者線程是不夠的,因爲如果我想通過添加更多機器與我的消費應用程序來擴展,我仍然會有多個消費者。我可能需要排序消息,但有多臺機器可能需要創建某種羣集感知組件,我使用SI,所以這在我看來很難實現。
在1.2之前的ES版本中,我們使用了外部版本,比如時間戳,在我的場景中ES會拋出VersionConflictException
:第一次更新的版本應該是10000版本,第二次10001版本,如果第一版本會有ES會拒絕10000版本的請求,因爲它低於現有版本。但從最新版本,ES球員have removed this functionality進行更新操作。
如何使用spring-amqp注入這個散列函數?你能給我一個簡單的例子嗎? – 2014-11-26 11:03:36
只需以某種方式從文檔中計算散列,例如'customerNumber%3'(如果有3個隊列),並使用它在'rabbitTemplate.send ...(...)'方法中構建'routingKey'。 – 2014-11-26 14:22:24
比方說,我發佈到3個不同的隊列,併爲每個隊列註冊1個消費者。如果我有3臺機器來部署我的應用程序,我如何確保只有一個線程會從隊列中獲取消息? – 2014-11-27 07:35:43