2017-08-13 153 views
1

我有一個play(2.4.2其中有akka 2.4.18)應用程序,我正在使用akka actors來上傳文件。我有一個父主管演員用這種層次在akka演員系統外發送響應

的UploadSupervisor ---孩子---> UploadActor ---孩子--->
DataWriteActor & MetaWriteActor

葉演員MetaWriteActor DataWriteActor執行實際寫入。我的代碼一個非常簡化的版本如下:

首先我有一個演員主管:

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor") 
override def supervisorStrategy = OneForOneStrategy() { 
    case _: Throwable => Restart 
} 

override def receive: Receive = { 
    case data: Data => uploadActor ! data 
    case meta: MetaInfo => uploadActor ! meta 
    //How do I send response outside of actor system? 
    case dataSuccess: DataUploadResponse => ??? //Line 10 
    case metaSuccess: MetaUploadResponse => ??? //Line 11 

} 

object UploadSupervisor { 
    val uploadSupervisor = Akka.system 
    .actorOf(Props(new UploadSupervisor), "UploadSupervisor") 
} 
//Request & Response case classes 
case class Data(content: String) 
case class MetaInfo(id: String, createdDate: Timestamp) 

case class DataUploadResponse(location: String) 
case class MetaUploadResponse(location: String) 

UploadActor: -

class UploadActor extends Actor { 
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor") 
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor") 

override def receive = { 
case data: Data => dataWriteActor ! data 
case meta: MetaInfo => metaWriteActor ! meta 
case dataResp: DataUploadResponse => context.parent ! dataResp 
case metaResp: MetaUploadResponse => context.parent ! metaResp 

} 
} 

DataWriteActor:

class DataWriteActor extends Actor { 
    case data: Data => //Do the writing 
        println("data write completed") 
        sender() ! DataUploadResponse("someLocation") 

} 

MetaWriteActor

class MetaWriteActor extends Actor { 
    case meta: MetaInfo=> //Do the writing 
        println(" meta info writing completed") 
        sender() ! MetaUploadResponse("someOtherLocation") 

} 

外的某處系統演員 -

implicit val timeout = Timeout(10 seconds) 
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]]) 

val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]]) 

//Do something with futures 

的問題是如何發送演員系統之外的反應如何?因爲在第10行& 11,我不能使用發件人! msg,因爲當前發件人是UploadActor。

+0

「問題是如何將響應發送到演員系統之外?」外部系統是什麼意思?一種方法是使用消息傳遞中間件並從演員那裏發送消息。消費者訂閱並從那裏消費。阿卡也使用類似的概念,但只要我知道它僅限於演員。 – Imran

+0

@Imran我的意思是從UploadSupervisor(第10和11行)到我正在使用的主線程「?「(問)獲取值 – Aiden

+0

@Imran你能給我推薦一個這樣的消息中間件,我可以探索嗎?另外,假設我不打算實例化這些演員的多個實例。您如何看待制作UploadActor單例(class to object)。然後我可以調用'UploadActor.dataWriteActor?Data(「Hello Akka」)'等 – Aiden

回答

1

你可以保持UploadSupervisor引用初始發件人:

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props[UploadActor], "UploadActor") 

    override val supervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 

    var dataSender: Option[ActorRef] = None 
    var metaSender: Option[ActorRef] = None 

    def receive = { 
    case data: Data => 
     val s = sender 
     dataSender = Option(s) 
     uploadActor ! data 
    case meta: MetaInfo => 
     val s = sender 
     metaSender = Option(s) 
     uploadActor ! meta 
    case dataSuccess: DataUploadResponse => 
     dataSender.foreach(_ ! dataSuccess) 
    case metaSuccess: MetaUploadResponse => 
     metaSender.foreach(_ ! metaSuccess) 
    } 
} 

要發送消息給UploadSupervisor

implicit val timeout = Timeout(10 seconds) 

val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse] 

val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse] 

上述假設你發送一個Data消息,一個MetaInfo消息到UploadSupervisor在一個時間。如果您發送多個DataMetaInfo消息並期望併發應答,則此方法會失效。更一般的解決方案是包括參照該包裹現有情況下的類附加case類初始發送者,通過您的演員層次結構中此參考:

case class DataMsg(data: Data, target: ActorRef) 
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef) 

case class DataUploadMsg(response: DataUploadResponse, target: ActorRef) 
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef) 

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props[UploadActor], "UploadActor") 

    override val supervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 

    def receive = { 
    case data: Data => 
     val s = sender 
     uploadActor ! DataMsg(data, s) 
    case meta: MetaInfo => 
     val s = sender 
     uploadActor ! MetaInfoMsg(meta, s) 
    case DataUploadMsg(response, target) => 
     target ! response 
    case MetaUploadMsg(response, target) => 
     target ! response 
    } 
} 

UploadActor

class UploadActor extends Actor { 
    val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor") 
    val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor") 

    def receive = { 
    case data: DataMsg => dataWriteActor ! data 
    case meta: MetaInfoMsg => metaWriteActor ! meta 
    case dataResp: DataUploadMsg => context.parent ! dataResp 
    case metaResp: MetaUploadMsg => context.parent ! metaResp 
    } 
} 

的作家:

class DataWriteActor extends Actor { 
    def receive = { 
    case DataMsg(data, target) => 
     // do the writing 
     println("data write completed") 
     sender ! DataUploadMsg(DataUploadResponse("someLocation"), target) 
    } 
} 

class MetaWriteActor extends Actor { 
    def receive = { 
    case MetaInfoMsg(meta, target) => 
     // do the writing 
     println("meta info writing completed") 
     sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target) 
    } 
} 
+0

更喜歡第二種方法。雖然它讓我創建了幾個額外的case類,但是,不介意這樣做謝謝你的幫助。接受答案。 – Aiden