2014-11-20 93 views
3

我正在開發一個系統,這將涉及大量數據同步分解成小任務。我將每個小任務作爲Azure服務總線隊列上的作業/消息添加。Azure服務總線 - 刪除特定消息

我有X個Worker角色,然後檢查隊列並處理數據。

我不希望隊列中有很多消息,因爲我們的目標是處理消息,完成消息,然後再次添加相同的消息,但計劃在X分鐘時間內完成。這會給我一個循環來繼續處理這些任務。

關於Azure功能的好處是,它們可以處理所有服務器端的東西,但缺點是它有時很難調試或操作數據。

我想要做的是在Web界面中顯示隊列中的消息列表(我已經使用PeekBatch完成)。然後我希望能夠選擇一些/所有的消息並刪除它們。

如果代碼中存在錯誤,我想要這樣做,並且我想停止某種類型的消息停止。

接下來我將具有從網頁重新添加消息的功能。也許我可能想要增加我的工作角色和消息以更快的速度執行任務(或減慢它們),或者重新添加已刪除的消息。

所以,問題是,我該如何從隊列中真正選擇特定的消息,然後將其刪除?從我所看到的情況來看,沒有明顯的方法可以做到這一點,如果可能的話,這將需要某種解決方法。這聽起來有點奇怪。

編輯:

我已經得到了一些作品,但它確實似乎沒有一個很好的解決方案:

public void DeleteMessages(List<long> messageIds) 
    { 
     foreach (var msg in Client.ReceiveBatch(100)) 
     { 
      if (messageIds.Contains(msg.SequenceNumber)) 
       msg.Complete(); // Deletes the message 
      else 
       msg.Abandon(); // Puts it back in the queue 
     } 
    } 

這將讓越來越少的高效較大的隊列,但它至少在刪除呼叫正在進行時停止所有活動並刪除指定的消息。

它也只會刪除準備處理的消息。將來的消息將被忽略,所以我現在添加了添加「睡眠」消息的能力,在我的消息「準備就緒」之前停止處理隊列,並且我可以刪除它們。

微軟已經通知我他們目前正在使用API​​來刪除應該在幾個月內可用的特定消息。在此之前,這完全是關於解決方法。

月更新:

不過從微軟在這個問題和上面的方法沒有更新不夠理想。我現在已經修改了我的代碼,以便:

我放入消息的對象有一個新屬性:

Guid? MessageId { get; set; } 

注意它可爲空的Guid只是爲了向後兼容

當我想刪除一條消息,我將我的MessageId添加到數據庫表「DeletedMessage」中。

說到處理消息,我在DeletedMessage表中查找匹配的Guid,如果它找到一個,我只需完成()消息而不進行正常處理。

這個效果很好,但是開銷很小。如果你不處理大量的消息,這不是一個大問題。

另請注意,我最初通過使用SequenceNumber來完成此操作,但是(奇怪的是)SequenceNumber在查看和檢索消息之間發生了變化!這可以防止這個想法起作用,除非你像上面那樣使用你自己的Id。

回答

0

您可以創建自己的隊列清理程序,該程序具有消息偵聽器,該消息偵聽器根據消息的某些條件查看並放棄或提交消息。如果您預見在實際生產環境中需要清潔,則可能是控制器中的一個操作。

+0

感謝您的回覆,但每當我嘗試查看郵件,然後對郵件執行操作(完成,放棄,延遲)時,它總是會出現錯誤,說它無法在上下文中執行任何操作。 – McGaz 2014-11-21 10:30:17

0

從我的實驗中,不可能從消息隊列中刪除一個活動消息。

通過調用Receive(sequenceNumber)然後調用Complete()可以刪除延期的消息。

通過調用CancelScheduledMessageAsync(sequenceNumber)可以刪除計劃消息。

對於活動消息,您必須確保它們在某些時候移動到死信隊列中。

然後可以刪除或重新提交死信隊列中的消息。