2015-06-20 71 views
0

我從一個工作示例中獲得了一些代碼片段。此示例是在服務結構中創建REST呼叫(WebAPI)和輪詢器以輪詢請求。有五個參與者(1)FileImportValidator驗證文件名(2)FileParser來解析文件(3)AgeValidator驗證年齡(4)FilePersister堅持的姓名和年齡作爲事件。Service Fabric,Akka.net和Persistent actor integration

請分享此設計是否適用於事件源系統的AKKA.NET角色建模。

PS。要解析的文件已經上傳。 REST調用僅提供文件名。我有意刪除了一些驗證邏輯。

//WebAPI: 

     [HttpPost] 
     [Route("import")] 
     public async Task<IHttpActionResult> Import(FileImportRequest request) 
     { 
      IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue"); 

      using (ITransaction tx = stateManager.CreateTransaction()) 
      { 
       await queue.EnqueueAsync(tx, request.FileName); 

       await tx.CommitAsync(); 
      } 

      return Ok(); 
     } 

    // Poller in Microsoft Service Fabric MicroService: 

    public class FileImportMicroService : StatefulService 
    { 
     public FileImportMicroService() 
     { 
      domainActorSystem = ActorSystem.Create("DomainActorSystem"); 

      fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator"); 
     } 

     protected override ICommunicationListener CreateCommunicationListener() 
     { 
      ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name); 

      return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager)); 
     } 

     protected override async Task RunAsync(CancellationToken cancellationToken) 
     { 
      var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue"); 

      while (!cancellationToken.IsCancellationRequested) 
      { 
       using (ITransaction tx = this.StateManager.CreateTransaction()) 
       { 
        ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx); 

        if (dequeuReply.HasValue) 
        { 
         FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value)); 
        } 

        ServiceEventSource.Current.Message(dequeuReply.Value); 

        await tx.CommitAsync(); 
       } 

       await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); 
      } 
     } 

     ActorSystem domainActorSystem; 

     IActorRef fileImportValidator; 
    } 


//FileImportValidator Actor 

    public class FileImportValidator : UntypedActor 
    { 
     protected override void OnReceive(object message) 
     { 
      Handle((dynamic) message); 
     } 

     public void Handle(ValidateFileCommand command) 
     { 
      _fileParser = Context.ActorOf(Props.Create(() => new FileParser())); 

      ... 

      _fileParser.Tell(new ValidationSuccessfulEvent(command.FileName)); 
     } 

     private IActorRef _fileParser; 
    } 

//FileParser Actor: 

    public class FileParser : UntypedActor 
    { 
     private IActorRef _ageValidator; 

     protected override void OnReceive(object message) 
     { 
      Handle((dynamic) message); 
     } 

     public void Handle(ValidationSuccessfulEvent message) 
     { 
      var lines = File.ReadLines(message.FileName); 

      foreach(var line in lines) 
      { 
       var cols = line.Split(','); 

       var File = new { Name = cols[0], Age = cols[1] }; 

       _ageValidator.Tell(new ValidateAge(File.Name, File.Age)); 
      } 
     } 

     protected override void PreStart() 
     { 
      _ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator())); 

      base.PreStart(); 
     } 
    } 

//AgeValidator Actor: 

    public class AgeValidator : UntypedActor 
    { 
     protected override void OnReceive(object message) 
     { 
      if (message is ValidateAge) 
      { 
       _filePersistor.Tell(new SaveNameAndAge(message)); 
      } 
     } 

     protected override void PreStart() 
     { 
      _filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor"); 

      base.PreStart(); 
     } 

     private IActorRef _filePersistor; 
    } 

//Persistent Actor: 

    public class FilePersistor : PersistentActor 
    { 
... 
     protected override bool ReceiveCommand(object message) 
     { 
      Persist(/* Handler to persist name and age */); 

      return true; 
     } 
... 
    } 

回答

2

你可以考慮另一種方法是使用ReliableDictionary的服務,以「堅持」系統(已處理的文件)的狀態。上傳新文件時,您將創建一個新角色,並傳遞一個FileId,以便演員可以檢索數據並對其進行處理。完成後,它會調用服務,以便將項目從列表中刪除。這樣你可以並行處理文件。

+0

我相信你建議創建儘可能多的'IActorRef fileImportValidator'並行處理文件,而不是使用單個fileImportValidator從隊列中一次處理一個文件。艾米我是對嗎? –

+1

正確,這個想法是爲了避免有順序管道的潛在瓶頸。你有機會並行使用多個角色。 Akka/Akk.net爲路由器提供可靠的參與者,您依賴於服務結構的「運行時」,通過分區爲您提供規模。 – clca

相關問題