我有一個寧靜的API接收一組JSON消息,這些消息將被轉換爲Avro個別消息,然後發送給Kafka。在路由內部,我調用了3個不同的角色:1)一個actor出去並從磁盤2中檢索Avro模式),然後遍歷JSON消息數組,並將其與第二個actor中的Avro模式進行比較。如果任何消息沒有驗證,那麼我需要將響應返回給API的調用者並停止處理。 3)遍歷數組並傳入第3個參與者,該參與者接受JSON對象,將其轉換爲Avro消息併發送給Kafka主題。在噴霧路線中鏈接Akka Actor
如果我遇到問題,我的頭部纏繞的是如何在路線中停止處理,如果其中一個演員失敗。我將請求上下文傳遞給每個actor,並調用它是完整的方法,但它似乎不立即停止,即使不應該,下一個actor仍然處理。下面是我在做什麼的航線代碼片段:
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
我已經經歷了很多圍繞這一話題的博客/書籍/幻燈片和不知道的最好的方法是什麼的看着。我一直在使用Scala/Akka大約2個月,基本上我只是自學了一些我一直需要的東西。因此,對於經驗豐富的Scala/Akka/Spray開發人員所具有的任何洞察力,都非常感謝。我想過的一件事就是把這3名演員包裝在一位「主人」演員中,並讓每個演員都成爲這個演員的孩子,並試圖像那樣接近它。
卡洛斯,謝謝你的想法。因此,我所做的就是創建一個路由調用的「主管」角色,並且該角色處理與路由的接口(管理來自其他3個角色的成功和錯誤消息,然後一旦Actor A成功,消息演員B.如果演員B成功,則調用演員C.如果演員A,B,C中存在錯誤,則將錯誤消息發回給監督演員。現在就像魅力一樣工作。 –