聲明:我是新來的阿卡:)其次消息成爲未處理的在我的Akka.Net演員,然後似乎停止
我想實現在阿卡路由器,基本上
- 收到IActorRef消息
- 查找字典中如果沒有找到匹配的是處理郵件
- 的類型,使用Akka.DI作爲兒童演員創造一個添加到字典中
- 轉發郵件到該行爲或
這個偉大的工程 - 第一次,但如果我試圖告訴()或向()路由器的兩倍,第二個消息總是流作爲未處理
在結束了我試着覆蓋子actor中的Unhandled()並在其中放置一個斷點,實際上正在對第二條消息進行打擊。
路由器:
public class CommandRouter : UntypedActor
{
protected readonly IActorResolver _resolver;
private static readonly Dictionary<Type, IActorRef> _routees = new Dictionary<Type, IActorRef>();
private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter());
public CommandRouter(IActorResolver resolver)
{
_resolver = resolver;
}
protected override void OnReceive(object message)
{
_log.Info("Routing command {cmd}", message);
var typeKey = message.GetType();
if (!_routees.ContainsKey(typeKey))
{
var props = CreateActorProps(typeKey);
if (!props.Any())
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Could not route message to routee. No routees found for message type {typeKey.FullName}")));
return;
}
if (props.Count() > 1)
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Multiple routees registered for message {typeKey.FullName}, which is not supported by this router. Did you want to publish stuff instead?")));
return;
}
var prop = props.First();
var routee = Context.ActorOf(prop, prop.Type.Name);
_routees.Add(typeKey, routee);
}
_routees[typeKey].Forward(message);
}
private IEnumerable<Props> CreateActorProps(Type messageType)
{
return _resolver.TryCreateActorProps(typeof(IHandleCommand<>).MakeGenericType(messageType)).ToList();
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(x => Directive.Restart);
}
}
的ActorResolver方法,它使用DependencyResolver從Akka.DI.StructureMap:
public IEnumerable<Props> TryCreateActorProps(Type actorType)
{
foreach (var type in _container.GetAllInstances(actorType))
{
yield return _resolver.Create(type.GetType());
}
}
實際的兒童演員是相當straigt前鋒:
public class ProductSubscriptionHandler : ReceiveActor, IHandleCommand<AddProductSubscription>
{
public ProductSubscriptionHandler()
{
Receive<AddProductSubscription>(Handle);
}
protected bool Handle(AddProductSubscription command)
{
Sender?.Tell(Response.Empty);
return true;
}
}
整個事情在演員系統初始化後調用,如下所示:
var router = Sys.ActorOf(resolver.Create<CommandRouter>(), ActorNames.CommandRouter);
router.Ask(new AddProductSubscription());
router.Ask(new AddProductSubscription());
我一直得到這個錯誤在第二(或任何後續)消息:「未處理的消息來自...」:
[INFO][17-07-2017 23:05:39][Thread 0003][[akka://pos-system/user/CommandRouter#676182398]] Routing command Commands.AddProductSubscription
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] now supervising akka://pos-system/user/CommandRouter/ProductSubscriptionHandler
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] *Unhandled message from akka://pos-system/temp/d* : Documents.Commands.AddProductSubscription
[DEBUG][17-07-2017 23:05:39][Thread 0007][akka://pos-system/user/CommandRouter/ProductSubscriptionHandler] Started (Consumers.Service.Commands.ProductSubscriptionHandler)
恐怕那裏有點太不熟悉了我。 (我沒有使用過無類型的演員,也沒有使用DI。)但是,理論上,你不應該將_routees變量聲明爲* static *(因爲那樣你可以在actor-threads和線程之間共享它)。不過,我懷疑這是你的問題。 (如果是我,我會通過在try catch塊中包裝所有東西來進行調查,也許會覆蓋一些生命週期(prerestart?)方法,額外的日誌記錄和thread.sleeps。)這可能是一件愚蠢的事情:-)不要詢問調用需要等待他們嗎? – mwardm
這段代碼很難說出什麼問題。你能在github上提供一個複製例子嗎?我想,這會讓事情變得更快。 此外,如果您有計劃使用此設計進行分發,您可能有興趣使用Akka.Cluster.Sharding插件,它與您的代碼具有相同的功能。 – Horusiath