0
我從一個工作示例中獲得了一些代碼片段。此示例是在服務結構中創建REST呼叫(WebAPI)和輪詢器以輪詢請求。有五個參與者(1)FileImportValidator驗證文件名(2)FileParser來解析文件(3)AgeValidator驗證年齡(4)FilePersister堅持的姓名和年齡作爲事件。Service Fabric,Akka.net和Persistent actor integration
請分享此設計是否適用於事件源系統的AKKA.NET角色建模。
PS。要解析的文件已經上傳。 REST調用僅提供文件名。我有意刪除了一些驗證邏輯。
//WebAPI:
[HttpPost]
[Route("import")]
public async Task<IHttpActionResult> Import(FileImportRequest request)
{
IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
using (ITransaction tx = stateManager.CreateTransaction())
{
await queue.EnqueueAsync(tx, request.FileName);
await tx.CommitAsync();
}
return Ok();
}
// Poller in Microsoft Service Fabric MicroService:
public class FileImportMicroService : StatefulService
{
public FileImportMicroService()
{
domainActorSystem = ActorSystem.Create("DomainActorSystem");
fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
}
protected override ICommunicationListener CreateCommunicationListener()
{
ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);
return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (!cancellationToken.IsCancellationRequested)
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
}
ServiceEventSource.Current.Message(dequeuReply.Value);
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
ActorSystem domainActorSystem;
IActorRef fileImportValidator;
}
//FileImportValidator Actor
public class FileImportValidator : UntypedActor
{
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidateFileCommand command)
{
_fileParser = Context.ActorOf(Props.Create(() => new FileParser()));
...
_fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
}
private IActorRef _fileParser;
}
//FileParser Actor:
public class FileParser : UntypedActor
{
private IActorRef _ageValidator;
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidationSuccessfulEvent message)
{
var lines = File.ReadLines(message.FileName);
foreach(var line in lines)
{
var cols = line.Split(',');
var File = new { Name = cols[0], Age = cols[1] };
_ageValidator.Tell(new ValidateAge(File.Name, File.Age));
}
}
protected override void PreStart()
{
_ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));
base.PreStart();
}
}
//AgeValidator Actor:
public class AgeValidator : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is ValidateAge)
{
_filePersistor.Tell(new SaveNameAndAge(message));
}
}
protected override void PreStart()
{
_filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");
base.PreStart();
}
private IActorRef _filePersistor;
}
//Persistent Actor:
public class FilePersistor : PersistentActor
{
...
protected override bool ReceiveCommand(object message)
{
Persist(/* Handler to persist name and age */);
return true;
}
...
}
我相信你建議創建儘可能多的'IActorRef fileImportValidator'並行處理文件,而不是使用單個fileImportValidator從隊列中一次處理一個文件。艾米我是對嗎? –
正確,這個想法是爲了避免有順序管道的潛在瓶頸。你有機會並行使用多個角色。 Akka/Akk.net爲路由器提供可靠的參與者,您依賴於服務結構的「運行時」,通過分區爲您提供規模。 – clca