2013-01-04 123 views
2

延遲隊列是一個隊列,其中每個消息都有一個與其相關的延遲時間,並且消息只能在其延遲期滿時才被採用。隊列的頭部是那些延遲過去最長的消息。如果沒有延遲已經過期,則沒有頭,並且出隊將返回空值。使用一個或多個標準FIFO隊列實現延遲隊列

實際上,我正在使用Azure編寫雲應用程序,而在Azure中,只有FIFO隊列可用,而不是優先級/延遲隊列。所以我來到這裏,看看有沒有人可以給我一些指引,讓我可以從正確的方向開始。我搜索了很多,但只發現了Java中的延遲隊列實現,沒有一般的關於延遲隊列的標準教程/研究論文。

編輯:

我有什麼碼?
其實,我必須首先設計這些東西,並將其呈現給我的經理,一旦我們完成設計,那麼只有我可以開始編碼。

更多有關場景
其基於主/從模式的分佈式應用程序的詳細信息。主服務器生成消息並將它們放入Azure服務總線隊列中,並且有多個從服務器(運行在多臺機器上)從隊列中讀取並執行它們。如果萬一主站發生故障,則其中一個從站作爲主站並開始生成消息。我不想在主數據庫中存儲任何狀態信息,因爲如果主數據庫出現故障,所有狀態信息也會隨之一起存儲。

+0

請出示一些源代碼..你有什麼嘗試?什麼不工作/你卡在哪裏? – Yahia

+2

如果你有一個Java實現,它不應該太難轉換爲C#。甚至可能比從頭開始寫自己更容易。 – Servy

+0

延遲隊列的C#實現將類似於Java實現。 –

回答

9

Windows Azure Queue消息在將消息插入隊列時指定了延遲,以秒爲單位。在超時延遲被觸發之前,消息將不可見。請參閱this MSDN article以查看API詳細信息。

隱形超時也在各種語言的SDK實現中實現。由於您正在使用C#,因此調用的內容如下所示。需要注意的AddMessage()第三個參數指定了隱形超時:

 var acct = CloudStorageAccount.DevelopmentStorageAccount; 
     var queueClient = acct.CreateCloudQueueClient(); 
     var queue = queueClient.GetQueueReference("myqueue"); 
     queue.CreateIfNotExist(); 

     var msg = new CloudQueueMessage("test message"); 
     queue.AddMessage(msg, TimeSpan.FromHours(2), TimeSpan.FromMinutes(30)); 
0

你怎麼樣建立一個隊列兩步過程中退出的項目。這裏是高層次的過程:

  • 出隊FIFO隊列中的第一項;將其隱身時間設置爲N分鐘(無論您決定隱身應該是什麼) - 這允許您在一段時間內隱藏物品,就好像它不存在於隊列中一樣。這是我所指的NextVisibleTime屬性。

  • 檢查DequeueCount屬性 - 如果出列計數爲0,那麼這是第一次出現該項目。忽略該項目並繼續前進。由於它的隱身性已經確定,它不會再被提取,直到時間到了。如果出隊計數大於或等於1,它將被出隊一次,並且必須在所需的時間內被設置爲不可見。

這應該允許你實現延遲隊列。我也可以考慮其他方法。例如隊列中的每個項目作爲創建時間;這可以用來動態計算物品需要保持不可見的時間。要更改物業的隱身性,請檢查此方法:http://msdn.microsoft.com/en-us/library/microsoft.windowsazure.storageclient.cloudqueue.updatemessage.aspx

3

因此,首先我們需要實施優先級隊列。這是我不久前寫的一個。這可能不是理想的;它有一個很小的API,它可能會表現得更好,但它是一個足夠的起點:包裝,當

public class PriorityQueue<TPriority, TElement> 
{ 
    SortedDictionary<TPriority, Queue<TElement>> dictionary = new SortedDictionary<TPriority, Queue<TElement>>(); 
    public PriorityQueue() 
    { 
    } 

    public Tuple<TPriority, TElement> Peek() 
    { 
     var firstPair = dictionary.First(); 
     return Tuple.Create(firstPair.Key, firstPair.Value.First()); 
    } 

    public TElement Pop() 
    { 
     var firstPair = dictionary.First(); 
     TElement output = firstPair.Value.Dequeue(); 

     if (!firstPair.Value.Any()) 
      dictionary.Remove(firstPair.Key); 

     return output; 
    } 

    public void Push(TPriority priority, TElement element) 
    { 
     Queue<TElement> queue; 
     if (dictionary.TryGetValue(priority, out queue)) 
     { 
      queue.Enqueue(element); 
     } 
     else 
     { 
      var newQueue = new Queue<TElement>(); 
      newQueue.Enqueue(element); 
      dictionary.Add(priority, newQueue); 
     } 
    } 
} 

延遲隊列很簡單:

public class DelayQueue<T> 
{ 
    private PriorityQueue<DateTime, T> queue = new PriorityQueue<DateTime, T>(); 
    public void Enqueue(T item, int delay) 
    { 
     queue.Push(DateTime.Now.AddMilliseconds(delay), item); 
    } 

    public T Dequeue() 
    { 
     if (queue.Peek().Item1 > DateTime.Now) 
      return queue.Pop(); 
     else 
      return default(T); 
    } 
}