2017-03-12 87 views
0

MongoDB的文件我有這樣異步更新或插入使用的.Net司機

public class SomeDocument 
{ 
    public Guid Id { get; set; } 
    public string PropertyA { get; set; } 
    public string PropertyB { get; set; } 
} 

文件現在我有兩個不同的服務(A和B)以異步方式更新PropertyA和PropertyB適當工作。這意味着我不知道什麼樣的服務會先完成,應該創建文檔以及誰應該更新它。

因此,更新(或創建)的文件我目前在服務中使用的代碼像這樣的

var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true }; 
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value"); 

await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options); 

和服務B下一個代碼

var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true }; 
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value"); 

await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options); 

一切看起來很好,但有時我當兩種服務同時工作時得到下一個錯誤

Unhandled Exception: MongoDB.Driver.MongoCommandException: Command findAndModify failed: E11000 duplicate key error collection: someDocuments index: _id_ dup key: { : BinData(3, B85ED193195A274DA94BC86B655B4509) }. 
    at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.ProcessReply(ConnectionId connectionId, ReplyMessage`1 reply) 
    at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.<ExecuteAsync>d__11.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Servers.Server.ServerChannel.<ExecuteProtocolAsync>d__26`1.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Operations.CommandOperationBase`1.<ExecuteProtocolAsync>d__29.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Operations.WriteCommandOperation`1.<ExecuteAsync>d__2.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Operations.FindAndModifyOperationBase`1.<ExecuteAsync>d__19.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.OperationExecutor.<ExecuteWriteOperationAsync>d__3`1.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.MongoCollectionImpl`1.<ExecuteWriteOperationAsync>d__62`1.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult() 
    at CVSP.MongoDbStore.MongoDbWriteModelFacade.<AddRecordField>d__6.MoveNext() in D:\Projects\Test\Source\MongoDbStore\WriteModel\MongoDbWriteModelFacade.cs:line 58 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.<>c.<ThrowAsync>b__6_1(Object state) 
    at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state) 
    at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) 
    at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem() 
    at System.Threading.ThreadPoolWorkQueue.Dispatch() 
    at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() 

我該如何插入/更新文件在這種情況下?

UPDATE

推廣使用做的try/catch在第一龜頭的伎倆

public static async Task<TProjection> FindOneAndUpdateWithConcurrencyAsync<TDocument, TProjection>(this IMongoCollection<TDocument> collection, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken)) 
    { 
     try 
     { 
      return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken); 
     } 
     catch (MongoException ex) 
     { 
      Thread.Sleep(10); 

      return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken); 
     } 
    } 

看起來很奇怪,我並沒有從一開始但看完https://docs.mongodb.com/manual/reference/method/db.collection.findAndModify/#upsert-and-unique-index所有的疑問消失後,喜歡它。

回答

1

好吧,這是syncronization問題,不幸的是沒有簡單的解決方案。爲了找到黑客,讓我們來了解後端可能發生的事情。

我們假設我們有兩個線程(服務)嘗試插入文檔。

t1: 00:00:00.250 -> find document with Id (1) 
t2: 00:00:00.255 -> find document with id (1) 

t1: 00:00:00.260 -> No document found 
t2: 00:00:00.262 -> No document found 

t1: 00:00:00.300 -> Insert a document with Id(1) 
t2: 00:00:00.300 -> Insert a document with Id(1) 

賓果...我們得到了例外。兩個線程都試圖插入具有相同ID的文檔。

沒有我們可以在這裏做什麼?

讓我們把這個缺點變成我們的優勢。捕獲此異常並再次嘗試調用upsert。這一次,它會成功找到文件並更新它。

我已經修改代碼如下ServiceA第二ServiceB,並試圖在緊密循環插入10000個文件:

public async Task ServiceA(Guid id) 
{ 
    var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
    var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value"); 

    var options = new UpdateOptions() { IsUpsert = true }; 
    var database = _client.GetDatabase("stackoverflow"); 
    var collection = database.GetCollection<SomeDocument>(CollectionName, 
     new MongoCollectionSettings 
     { 
      WriteConcern = WriteConcern.W1 
     }); 

    await collection.UpdateOneAsync(filter, update, options); 
} 

public async Task ServiceB(Guid id) 
{ 
    var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
    var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value"); 

    var options = new UpdateOptions() { IsUpsert = true }; 
    var database = _client.GetDatabase("stackoverflow"); 
    var collection = database.GetCollection<SomeDocument>(CollectionName, 
     new MongoCollectionSettings 
     { 
      WriteConcern = WriteConcern.W1 
     }); 

    await collection.UpdateOneAsync(filter, update, options); 
} 

這是我lanuching代碼。不完美但有助於達到目的。

for (var i = 0; i < 10000; i++) 
{ 
    var _guid = Guid.NewGuid(); 
    var _tasks = new[] 
    { 
     new Task(async (x) => 
     { 
      var p = new Program(); 
      try 
      { 
       await p.ServiceA(Guid.Parse(x.ToString())); 
      } 
      catch (MongoWriteException me) 
      { 
       await Task.Delay(5); 
       await p.ServiceA(Guid.Parse(x.ToString())); 
      } 
     }, _guid), 
     new Task(async (x) => 
     { 
      var p = new Program(); 
      try 
      { 
       await p.ServiceB(Guid.Parse(x.ToString())); 
      } 
      catch (MongoWriteException me) 
      { 
       await Task.Delay(5); 
       await p.ServiceB(Guid.Parse(x.ToString())); 
      } 
     }, _guid) 
    }; 

    _tasks[0].Start(); 
    _tasks[1].Start(); 
    Task.WaitAll(_tasks); 
} 
+0

感謝,爲解決@Saleem ...會嘗試。我會在服務方法內移動try/catch :) –

+0

確實,您可以根據需要進行修改。正如我在我對驅動程序代碼的評論中所提到的那樣,它不是完美的,而是達到其目的。 – Saleem