2017-02-20 111 views
0

我有下面的代碼片段:阿卡流+阿卡的Http傳遞參數

case class SomeClass(param1:String,param2:String,param3:String) 

    val someClassActorSource: Source[SomeClass, ActorRef] = Source 
     .actorPublisher[SomeClass](Props[SomeClassActorPublisher]) 

    val someFlow: ActorRef = Flow[SomeClass] 

     .mapAsync(3)(f=> getDocumentById(f)) 

     .map(f =>{ 
      val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test") 
      .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a) 
      ) 
      (request,request) 

     }).via(connection) 

     //Parsing Response 
     .mapAsync(3){ 
      case (Success(HttpResponse(status, _, entity, _)),request)=> 
      entity.dataBytes.runFold(ByteString(""))(_ ++ _) 
     } 
     .map(resp =>parse(resp.utf8String,?????????????)) 
     .to(Sink.someSink{....}) 
     .runWith(someClassActorSource) 

    def parse(resp:String,parseParam:String)=???? 

,並在某處,我發短信給流量代碼:

someflow ! SomeClass("a","b","c") 
someflow ! SomeClass("a1","b1","c1") 

我的問題是該方法解析應該從原來如此類

所以對於第一條消息使用參數2應該是

parse(response,"b") 

和第二條消息應該是

parse(response,"b1") 

所以現在的問題是,我怎麼能取從我提交給流方法的參數?

回答

1

假設您的connection值正在通過

val connection = Http().cachedHostConnectionPool(...) 

實例可以使用該連接發生在一個元組的事實,而不是簡單地傳遞request兩次元組可以在輸入SomeClass通過。此SomeClass實例將不得不經過您的每個Flow值才能進入解析階段。

修改你的代碼位:

val getDocumentFlow = 
    Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map(d => d -> f)) 

你的問題沒有說明從getDocumentById返回類型,所以我只是用Document

val documentToRequest = 
    Flow[(Document, SomeClass)] map { case (document, someClass) => 
    val request = ... 

    (request, someClass) 
    } 

val parseResponse = 
    Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){ 
    case (Success(HttpResponse(status, _, entity, _)), someClass) => 
     entity 
     .dataBytes 
     .runFold(ByteString(""))(_ ++ _) 
     .map(e => e -> someClass) 
    } 

val parseEntity = Flow[(ByteString, SomeClass)] map { 
    case (entity, someClass) => parse(entity.utf8String, someClass) 
} 

這些流可以被用來作爲在問題中描述:

val someFlow = 
    someClassActorSource 
    .via(getDocumentFlow) 
    .via(documentToRequest) 
    .via(connection) 
    .via(parseResponse) 
    .via(parseEntity) 
    .to(Sink.someSink{...}) 
    .run()