2014-11-25 50 views
0

我有一個RabbitMQ代理,在這個代理上發佈不同的消息,這些消息最終將作爲Elasticsearch中的文檔。代理中有多個消費者,這些消費者實際上是分配給amqp入站網關的任務執行程序中的不同線程(在此使用spring集成和spring amqp)。排隊機制和Elasticsearch 1.4.0

認爲在以下情形:我已經創建了一個ES與文檔結構

{ 
    "field1" : "value1", 
    "field2" : "value2" 
} 

後來我送兩個更新請求,更新都同場,讓我們說field1。如果我一個接一個地發送這個消息(生產中的常見用例),我的消費者線程將以正確的順序獲取消息(amqp允許這樣做),但是處理可能發生在錯誤的順序中,並且以後更新的值可能是被第一個覆蓋。我最終會得到數據。

如何確保我的數據不會被損壞? =>擁有1個消費者線程是不夠的,因爲如果我想通過添加更多機器與我的消費應用程序來擴展,我仍然會有多個消費者。我可能需要排序消息,但有多臺機器可能需要創建某種羣集感知組件,我使用SI,所以這在我看來很難實現。

在1.2之前的ES版本中,我們使用了外部版本,比如時間戳,在我的場景中ES會拋出VersionConflictException:第一次更新的版本應該是10000版本,第二次10001版本,如果第一版本會有ES會拒絕10000版本的請求,因爲它低於現有版本。但從最新版本,ES球員have removed this functionality進行更新操作。

回答

1

一個解決方案可能是使用多個隊列並在每個隊列上有一個使用者;使用散列函數總是將更新路由到同一文檔到同一隊列,請參閱RabbitMQ Tutorials瞭解各種選項。

您可以通過添加更多隊列(並更改散列函數)來擴展。

對於彈性,請考慮在Spring XD中運行您的消費者。您可以爲每個兔子源(每個隊列)創建一個實例,如果發生故障,XD將負責將其故障轉移到另一個容器節點。

否則,你可以通過具有熱備份滾你自己 - 與auto-startup="false"配置的入站適配器,並有一些顯示器和使用<control-bus/>啓動一個新的實例,如果處於活動狀態下降。

編輯:

響應於下面的第四評論。

正如我上面所說,要擴展,你將不得不改變散列函數。因此,在運行時自動添加消費者將會非常棘手。

您不必在jar中對隊列名稱進行硬編碼,您可以使用屬性佔位符並從屬性,系統屬性或環境變量中填充它。

該解決方案是最簡單的,但有這些限制。但是,您可以構建一個管理應用程序,可以將其擴展出來 - 停止生產者,等待所有隊列停頓,重新配置使用者並重新啓動生產者--Spring Integration提供了一個<control-bus/>來啓動/停止適配器;你也可以通過JMX來完成。

其他解決方案是可能的,但通常需要維護羣集間的一些共享狀態(可能使用zookeeper等),所以更復雜;而你仍然必須處理競爭條件(第二次更新可能會在第一次之前到達某些消費者)。

+0

如何使用spring-amqp注入這個散列函數?你能給我一個簡單的例子嗎? – 2014-11-26 11:03:36

+0

只需以某種方式從文檔中計算散列,例如'customerNumber%3'(如果有3個隊列),並使用它在'rabbitTemplate.send ...(...)'方法中構建'routingKey'。 – 2014-11-26 14:22:24

+0

比方說,我發佈到3個不同的隊列,併爲每個隊列註冊1個消費者。如果我有3臺機器來部署我的應用程序,我如何確保只有一個線程會從隊列中獲取消息? – 2014-11-27 07:35:43

0

您可以使用缺省機制進行一致性檢查。基本上你想驗證你有最新版本的任何你正在更新。

因此,您需要使用對象獲取_version。在查詢中,您可以通過在頂層設置version = true來完成此操作。這將導致_version與您的查詢結果一起返回。然後,在進行更新時,只需將url中的版本參數設置爲您擁有的值,並且如果不匹配,則會生成版本衝突。

更好的是使用閉包來處理更新。基本上,它的工作原理如下:有一個更新方法,通過id獲取對象,應用封裝(更新函數的參數)封裝您想要做的修改,然後存儲修改後的對象。如果您捕獲仍然可能發生的版本衝突,則可以再次獲取該對象並將該閉包重新應用於該對象。我們這樣做,並在重試之前添加了隨機睡眠,這大大減少了多次更新失敗的機會,並且是一個不錯的設計模式。保持讀取和寫入在一起可以最大限度地減少衝突的可能性,然後在進一步最小化之前重新嘗試睡眠。您可以添加多個重試以進一步降低風險。

+0

這意味着每一次更新操作的調用。如果在同一個文件上有併發更新,發生間隔爲ms怎麼辦?這不會導致數據損壞嗎?另一個問題:你在同一領域有2個更新請求。你會希望第二個是最新的應用。有了這個重試,你的第一次更新最後會不會有變化? – 2014-12-01 08:53:04

+0

如果您正在進行併發更新,則無論如何都需要確保您擁有最新版本,然後才能覆蓋所有內容。最好的方法是在PUT之前做一個GET。在我們的情況下,如果我們不這樣做,我們有很多版本在負載下發生衝突。確實有機會以不同於預期的順序應用併發更新。所有這種模式的保證是1)你正在更新es中的版本。2)沒有其他更新同時被寫入。這是樂觀鎖定的一種形式。您可以在關閉過程中進行額外的檢查。 – 2014-12-01 13:35:48