2017-07-17 59 views
0

聲明:我是新來的阿卡:)其次消息成爲未處理的在我的Akka.Net演員,然後似乎停止

我想實現在阿卡路由器,基本上

  1. 收到IActorRef消息
  2. 查找字典中如果沒有找到匹配的是處理郵件
  3. 的類型,使用Akka.DI作爲兒童演員創造一個添加到字典中
  4. 轉發郵件到該行爲或

這個偉大的工程 - 第一次,但如果我試圖告訴()或向()路由器的兩倍,第二個消息總是流作爲未處理

在結束了我試着覆蓋子actor中的Unhandled()並在其中放置一個斷點,實際上正在對第二條消息進行打擊。

路由器:

public class CommandRouter : UntypedActor 
{ 
    protected readonly IActorResolver _resolver; 
    private static readonly Dictionary<Type, IActorRef> _routees = new Dictionary<Type, IActorRef>(); 
    private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter()); 

    public CommandRouter(IActorResolver resolver) 
    { 
     _resolver = resolver; 
    } 

    protected override void OnReceive(object message) 
    { 
     _log.Info("Routing command {cmd}", message); 
     var typeKey = message.GetType(); 

     if (!_routees.ContainsKey(typeKey)) 
     { 
      var props = CreateActorProps(typeKey); 

      if (!props.Any()) 
      { 
       Sender?.Tell(Response.WithException(
        new RoutingException(
         $"Could not route message to routee. No routees found for message type {typeKey.FullName}"))); 
       return; 
      } 

      if (props.Count() > 1) 
      { 
       Sender?.Tell(Response.WithException(
        new RoutingException(
         $"Multiple routees registered for message {typeKey.FullName}, which is not supported by this router. Did you want to publish stuff instead?"))); 
       return; 
      } 

      var prop = props.First(); 
      var routee = Context.ActorOf(prop, prop.Type.Name); 
      _routees.Add(typeKey, routee); 
     } 

     _routees[typeKey].Forward(message); 

    } 

    private IEnumerable<Props> CreateActorProps(Type messageType) 
    { 
     return _resolver.TryCreateActorProps(typeof(IHandleCommand<>).MakeGenericType(messageType)).ToList(); 
    } 

    protected override SupervisorStrategy SupervisorStrategy() 
    { 
     return new OneForOneStrategy(x => Directive.Restart); 
    } 
} 

的ActorResolver方法,它使用DependencyResolver從Akka.DI.StructureMap:

public IEnumerable<Props> TryCreateActorProps(Type actorType) 
{ 
    foreach (var type in _container.GetAllInstances(actorType)) 
    { 
     yield return _resolver.Create(type.GetType()); 
    } 
} 

實際的兒童演員是相當straigt前鋒:

public class ProductSubscriptionHandler : ReceiveActor, IHandleCommand<AddProductSubscription> 
{ 
    public ProductSubscriptionHandler() 
    { 
     Receive<AddProductSubscription>(Handle); 
    } 

    protected bool Handle(AddProductSubscription command) 
    { 
     Sender?.Tell(Response.Empty); 
     return true; 
    } 
} 

整個事情在演員系統初始化後調用,如下所示:

var router = Sys.ActorOf(resolver.Create<CommandRouter>(), ActorNames.CommandRouter); 

router.Ask(new AddProductSubscription()); 
router.Ask(new AddProductSubscription()); 

我一直得到這個錯誤在第二(或任何後續)消息:「未處理的消息來自...」:

[INFO][17-07-2017 23:05:39][Thread 0003][[akka://pos-system/user/CommandRouter#676182398]] Routing command Commands.AddProductSubscription 
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] now supervising akka://pos-system/user/CommandRouter/ProductSubscriptionHandler 
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] *Unhandled message from akka://pos-system/temp/d* : Documents.Commands.AddProductSubscription 
[DEBUG][17-07-2017 23:05:39][Thread 0007][akka://pos-system/user/CommandRouter/ProductSubscriptionHandler] Started (Consumers.Service.Commands.ProductSubscriptionHandler) 
+0

恐怕那裏有點太不熟悉了我。 (我沒有使用過無類型的演員,也沒有使用DI。)但是,理論上,你不應該將_routees變量聲明爲* static *(因爲那樣你可以在actor-threads和線程之間共享它)。不過,我懷疑這是你的問題。 (如果是我,我會通過在try catch塊中包裝所有東西來進行調查,也許會覆蓋一些生命週期(prerestart?)方法,額外的日誌記錄和thread.sleeps。)這可能是一件愚蠢的事情:-)不要詢問調用需要等待他們嗎? – mwardm

+0

這段代碼很難說出什麼問題。你能在github上提供一個複製例子嗎?我想,這會讓事情變得更快。 此外,如果您有計劃使用此設計進行分發,您可能有興趣使用Akka.Cluster.Sharding插件,它與您的代碼具有相同的功能。 – Horusiath

回答

0

所以,原來有一個更簡單的(和工作)解決我的問題:只需在CommandRouter構造函數中註冊並啓動所有的routee actor而不是per-receive。

所以現在我的代碼看起來簡單得多太:

CommandRouter:

public class CommandRouterActor : UntypedActor 
{ 
    public Dictionary<Type, IActorRef> RoutingTable { get; } 
    private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter()); 

    public CommandRouterActor(IActorResolver resolver) 
    { 
     var props = resolver.CreateCommandHandlerProps(); 
     RoutingTable = props.ToDictionary(k => k.Item1, v => Context.ActorOf(v.Item2, $"CommandHandler-{v.Item1.Name}")); 
    } 

    protected override void OnReceive(object message) 
    { 
     _log.Info("Routing command {cmd}", message); 
     var typeKey = message.GetType(); 

     if (!RoutingTable.ContainsKey(typeKey)) 
     { 
       Sender?.Tell(Response.WithException(
        new RoutingException(
         $"Could not route message to routee. No routees found for message type {typeKey.FullName}"))); 

       _log.Info("Could not route command {cmd}, no routes found", message); 
     } 

     RoutingTable[typeKey].Forward(message); 
    } 

    protected override SupervisorStrategy SupervisorStrategy() 
    { 
     return new OneForOneStrategy(x => Directive.Restart); 
    } 
} 

而且我ActorResolver(在上面的構造函數中使用)只是IHandleCommand<>queries the StructureMap model and ask for all registered instances

public IEnumerable<Tuple<Type, Props>> CreateCommandHandlerProps() 
    { 
     var handlerTypes = 
      _container.Model.AllInstances.Where(
        i => 
         i.PluginType.IsGenericType && i.PluginType.GetGenericTypeDefinition() == 
         typeof(IHandleCommand<>)) 
       .Select(m => m.PluginType); 

     foreach (var handler in handlerTypes) 
     { 
      yield return new Tuple<Type, Props>(handler.GenericTypeArguments.First(), _resolver.Create(handler)); 
     } 
    }