2015-05-14 44 views
0

我有一個寧靜的API接收一組JSON消息,這些消息將被轉換爲Avro個別消息,然後發送給Kafka。在路由內部,我調用了3個不同的角色:1)一個actor出去並從磁盤2中檢索Avro模式),然後遍歷JSON消息數組,並將其與第二個actor中的Avro模式進行比較。如果任何消息沒有驗證,那麼我需要將響應返回給API的調用者並停止處理。 3)遍歷數組並傳入第3個參與者,該參與者接受JSON對象,將其轉換爲Avro消息併發送給Kafka主題。在噴霧路線中鏈接Akka Actor

如果我遇到問題,我的頭部纏繞的是如何在路線中停止處理,如果其中一個演員失敗。我將請求上下文傳遞給每個actor,並調用它是完整的方法,但它似乎不立即停止,即使不應該,下一個actor仍然處理。下面是我在做什麼的航線代碼片段:

post { 
entity(as[JsObject]) { membersObj => 
     requestContext => 
     val membersJson = membersObj.fields("messages").convertTo[JsArray].elements 
     val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2())) 
     val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService())) 
     val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService())) 

     implicit val timeout = Timeout(5 seconds) 

     val future = avroService ? AvroSchema.MemberSchema(requestContext) 
     val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema] 

     for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext) 

     for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext)) 

我已經經歷了很多圍繞這一話題的博客/書籍/幻燈片和不知道的最好的方法是什麼的看着。我一直在使用Scala/Akka大約2個月,基本上我只是自學了一些我一直需要的東西。因此,對於經驗豐富的Scala/Akka/Spray開發人員所具有的任何洞察力,都非常感謝。我想過的一件事就是把這3名演員包裝在一位「主人」演員中,並讓每個演員都成爲這個演員的孩子,並試圖像那樣接近它。

回答

0

由於您正在使用異步處理(!),所以在發送消息後無法控制處理。您需要使用ask(?),這將返回您可以使用的未來。

但我有一個更好的主意。你可以從第一個演員發送消息到第二個演員。而不是將結果返回給第一個參與者,您可以將消息發送給第三個參與者以繼續計算。

+0

卡洛斯,謝謝你的想法。因此,我所做的就是創建一個路由調用的「主管」角色,並且該角色處理與路由的接口(管理來自其他3個角色的成功和錯誤消息,然後一旦Actor A成功,消息演員B.如果演員B成功,則調用演員C.如果演員A,B,C中存在錯誤,則將錯誤消息發回給監督演員。現在就像魅力一樣工作。 –