2012-02-03 73 views
0

我有2個問題。以下是該場景 -如何將應用脫機時入隊的郵件出隊(Oracle高級隊列)

有2個不同的進程進程A和進程B. 進程入隊的消息在消息隊列中。處理B從消息隊列中取消隊列中的消息。

1)進程B關閉一段時間,但進程A繼續將消息排入隊列。當進程B實時返回時,如何在進程B處於脫機狀態時將進程A發佈的消息隊列中的消息出列?

2)我使用的隊列是多個消費者隊列,因爲需要多於1個進程B才能使消息出隊。設計背後的原因是如果其中一個進程B死亡,另一個進程B仍然可以繼續處理該消息。同時,如果進程B的一個實例接收到一條消息,它應該通知其他進程B不處理該消息。

我沒有找到任何樣品。任何幫助是極大的讚賞。

回答

0

我剛剛完成了一個具有相當類似要求的項目。

問題1) 我創建了一個Windows服務定時器,它調用WCF Restful服務定期運行。 WCF服務然後將列出所有排隊的內容(每次調用最多500條消息)。任何已入隊的應該自動處理,以便即使此計時器在重新啓動後停止,它也會自動停止。

問題2) 我是從Oracle數據複製到CouchBase所以我不得不進行檢索時的時間戳進程啓動和已經是一個時間戳CouchBase保存的數據,如果第一個是比後者年長那麼就不會保存。 (這是爲了照顧比賽條件)。

在Oracle中,我還有一個觸發器,當某些事情入隊時,它會將ID和入隊時間複製到第二個表中。定期檢查這第二個表,如果一個項目已經在隊列表中出列,但第二個表沒有被更新以在WCF服務的特定時間範圍內反映出來,它會將數據重新排列成在過程中失敗的東西。

如果有幫助,這裏是使用odp.net的wcf安靜服務的示例。

OracleAQQueue _queueObj; 
OracleConnection _connObj; 
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString(); 
_connObj = new OracleConnection(_connString); 
_queueObj = new OracleAQQueue("QUEUENAME", _connObj); 
_connObj.Open(); 

    int i = 0; 
    bool messageAvailable = true; 

    while (messageAvailable && i < 500) 
    { 
    OracleTransaction _txn = _connObj.BeginTransaction(); 
    //Makes dequeue part of transaction 
    _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit; 
    _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME" 
    try 
    { 
     //Wait number of seconds for dequeue, default is forever 
     _queueObj.DequeueOptions.Wait = 2; 
     _queueObj.MessageType = OracleAQMessageType.Raw; 
     _queueObj.DequeueOptions.ProviderSpecificType = true; 
     OracleAQMessage _depMsq = _queueObj.Dequeue(); 
     var _binary = (OracleBinary)_depMsq.Payload; 
     byte[] byteArray = _binary.Value; 
     _txn.Commit(); 
    } 
    catch (Exception ex) 
    { 
     //This catch will always fire when all messages have been dequeued 
     messageAvailable = false; 
     if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1) 
      { 
      //Actual error present. 
      log.Info("Problem occurred during dequeue process : " + ex.Message); 
      } 
    } 
    } 

    _queueObj.Dispose(); 
    _connObj.Close(); 
    _connObj.Dispose(); 
    _connObj = null;