2016-02-16 57 views
2

我在Akka.Net中有一個層次結構的演員,並且想知道我是否選擇了正確的方式來做某件事,或者是否有更好/更簡單的方法來實現我想要的。如何使用Akka.Net執行WaitAll?

我的具體示例是,我構建了一個用戶 actor以響應用戶登錄到系統中,並且在構建此actor時,爲了完成actor的構建需要兩條數據。

如果這是常規的.NET代碼,我可能會有諸如以下...

public Task<User> LoadUserAsync (string username) 
{ 
    IProfileService profileService = ...; 
    IMessageService messageService = ...; 

    var loadProfileTask = profileService.GetUserProfileAsync(username); 
    var loadMessagesTask = messageService.GetMessagesAsync(username); 

    Task.WaitAll(loadProfileTask, loadMessagesTask); 

    // Now construct the user from the result of both tasks 
    var user = new User 
    { 
    Profile = loadProfileTask.Result, 
    Messages = loadMessagesTask.Result 
    } 

    return Task.FromResult(user); 
} 

這裏我用了WaitAll等待下屬任務來完成,並讓他們同時運行。

我的問題是 - 如果我想在Akka.Net中做同樣的事情,下面是最常用的方法嗎?形象地我已經創建了下面......

Actor Hierarchy

當我創建我的用戶的演員,我則構造一個(臨時)用戶加載演員,他們的工作是通過調用來獲得完整的用戶詳細信息配置文件actor和Messages actor。葉行爲者獲取的數據如下...

public class UserProfileLoader : ReceiveActor 
{ 
    public UserProfileLoader() 
    { 
     Receive<LoadUserRequest>(msg => 
     { 
      // Load the user profile from somewhere 
      var profile = new UserProfile(); 

      // And respond to the Sender 
      Sender.Tell(profile); 
      Self.Tell(PoisonPill.Instance); 
     }); 
    } 
} 

public class UserMessagesLoader : ReceiveActor 
{ 
    public UserMessagesLoader() 
    { 
     Receive<LoadUserRequest>(msg => 
     { 
      // Load the messages from somewhere 
      var messages = new List<Message>(); 

      // And respond to the Sender 
      Sender.Tell(messages); 
      Self.Tell(PoisonPill.Instance); 
     }); 
    } 
} 

它並不真正身在何處,他們從這次討論中的數據,但兩者簡單地將一些數據響應請求。

然後我有一個協調兩個數據採集演員的演員...

public class UserLoaderActor : ReceiveActor 
{ 
    public UserLoaderActor() 
    { 
     Receive<LoadUserRequest>(msg => LoadProfileAndMessages(msg)); 
     Receive<UserProfile>(msg => 
     { 
      _profile = msg; 
      FinishIfPossible(); 
     }); 

     Receive<List<Message>>(msg => 
     { 
      _messages = msg; 
      FinishIfPossible(); 
     }); 
    } 

    private void LoadProfileAndMessages(LoadUserRequest msg) 
    { 
     _originalSender = Sender; 
     Context.ActorOf<UserProfileLoader>().Tell(msg); 
     Context.ActorOf<UserMessagesLoader>().Tell(msg); 
    } 

    private void FinishIfPossible() 
    { 
     if ((null != _messages) && (null != _profile)) 
     { 
      _originalSender.Tell(new LoadUserResponse(_profile, _messages)); 
      Self.Tell(PoisonPill.Instance); 
     } 
    } 

    private IActorRef _originalSender; 
    private UserProfile _profile; 
    private List<Message> _messages; 
} 

這只是創建兩個下屬演員,就給他們一個消息讓開裂,然後等待兩至之前作出響應將收集的所有數據發送回原始請求者。

那麼,這似乎是一個合理的方式來協調兩個不同的答案,以便將它們結合起來嗎?有沒有更容易的方法來做到這一點,而不是自己製作它?

在此先感謝您的回覆!

+0

我不知道你是否需要單獨的演員作爲接收只有2個回覆fasade。無法'用戶'角色直接發送這兩個請求? – Horusiath

+0

我確實可以讓用戶演員完成所有這些工作,但由於用戶演員最有可能有更多的東西要做,我選擇將這裏的責任分配給Loader。不管工作分配如何,上述內容是否有意義並且/或者是否有更好/更少的代碼/更標準的方法來執行此操作? –

+1

配置文件和消息參與者中是否有非常具體的內容,或者他們只從數據庫中獲取數據? 如果是這樣,我可能只是將它包裝在一些在兩個子任務上執行'WhenAll'的異步方法,然後返回該任務,這樣調用者可以在完成時執行'PipeTo(Self)'。我知道我們宣揚「push dangerous工作給孩子「,但一個需要考慮,如果你真的從中受益,或者如果它只是導致代碼膨脹.. –

回答

1

恕我直言,這是一個有效的過程,當你叉動作,然後加入它。

順便說一句,你可以使用this.Self.GracefulStop(new TimeSpan(1));,而不是送毒丸。

+0

謝謝 - 我不想看停止演員的方式,讀過這個 - http:// getakka .net/docs/working-with-actors/stopping-actors我已將代碼更改爲Context.Stop(Self)。 –

0

你可以用問,WhenAll和PipeTo的組合:

var task1 = actor1.Ask<Result1>(request1); 
var task2 = actor2.Ask<Result2>(request2); 

Task.WhenAll(task1, task2) 
    .ContinueWith(_ => new Result3(task1.Result, task2.Result)) 
    .PipeTo(Self); 

... 

Receive<Result3>(msg => { ... }); 
2

感謝鄉親,所以現在我已經簡化演員顯著爲以下,同時基於羅傑和傑夫的建議...

public class TaskBasedUserLoader : ReceiveActor 
{ 
    public TaskBasedUserLoader() 
    { 
     Receive<LoadUserRequest>(msg => LoadProfileAndMessages(msg)); 
    } 

    private void LoadProfileAndMessages(LoadUserRequest msg) 
    { 
     var originalSender = Sender; 
     var loadPreferences = this.LoadProfile(msg.UserId); 
     var loadMessages = this.LoadMessages(msg.UserId); 

     Task.WhenAll(loadPreferences, loadMessages) 
      .ContinueWith(t => new UserLoadedResponse(loadPreferences.Result, loadMessages.Result), 
       TaskContinuationOptions.AttachedToParent & TaskContinuationOptions.ExecuteSynchronously) 
      .PipeTo(originalSender); 
    } 

    private Task<UserProfile> LoadProfile(string userId) 
    { 
     return Task.FromResult(new UserProfile { UserId = userId }); 
    } 

    private Task<List<Message>> LoadMessages(string userId) 
    { 
     return Task.FromResult(new List<Message>()); 
    } 
} 

LoadProfile和LoadMessages方法將最終調用一個存儲庫來獲取數據,但現在我有一個簡潔的方法來做我想做的事情。

再次感謝!

+0

很高興幫助。注意AttachedToParent,這裏的「父」不是Task.WhenAll,它是當前調用LoadProfileAndMessages的任務(在這種情況下沒有)。你用&代替|結合旗幟,所以你最終爲零。 –

+0

很棒的地方,謝謝! –

+0

順便說一下,我使用TaskContinuationOptions是從Petabridge文章中使用PipeTo複製的 - https://petabridge.com/blog/akkadotnet-async-actors-using-pipeto/。所以我想這也需要更新,因爲這裏也有兩個同樣的問題。 –