我剛剛完成了一個具有相當類似要求的項目。
問題1) 我創建了一個Windows服務定時器,它調用WCF Restful服務定期運行。 WCF服務然後將列出所有排隊的內容(每次調用最多500條消息)。任何已入隊的應該自動處理,以便即使此計時器在重新啓動後停止,它也會自動停止。
問題2) 我是從Oracle數據複製到CouchBase所以我不得不進行檢索時的時間戳進程啓動和已經是一個時間戳CouchBase保存的數據,如果第一個是比後者年長那麼就不會保存。 (這是爲了照顧比賽條件)。
在Oracle中,我還有一個觸發器,當某些事情入隊時,它會將ID和入隊時間複製到第二個表中。定期檢查這第二個表,如果一個項目已經在隊列表中出列,但第二個表沒有被更新以在WCF服務的特定時間範圍內反映出來,它會將數據重新排列成在過程中失敗的東西。
如果有幫助,這裏是使用odp.net的wcf安靜服務的示例。
OracleAQQueue _queueObj;
OracleConnection _connObj;
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString();
_connObj = new OracleConnection(_connString);
_queueObj = new OracleAQQueue("QUEUENAME", _connObj);
_connObj.Open();
int i = 0;
bool messageAvailable = true;
while (messageAvailable && i < 500)
{
OracleTransaction _txn = _connObj.BeginTransaction();
//Makes dequeue part of transaction
_queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
_queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME"
try
{
//Wait number of seconds for dequeue, default is forever
_queueObj.DequeueOptions.Wait = 2;
_queueObj.MessageType = OracleAQMessageType.Raw;
_queueObj.DequeueOptions.ProviderSpecificType = true;
OracleAQMessage _depMsq = _queueObj.Dequeue();
var _binary = (OracleBinary)_depMsq.Payload;
byte[] byteArray = _binary.Value;
_txn.Commit();
}
catch (Exception ex)
{
//This catch will always fire when all messages have been dequeued
messageAvailable = false;
if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1)
{
//Actual error present.
log.Info("Problem occurred during dequeue process : " + ex.Message);
}
}
}
_queueObj.Dispose();
_connObj.Close();
_connObj.Dispose();
_connObj = null;