2017-10-04 85 views
2

我有以下方法,它返回一個Future[Source[List[String]]](前兩個CSV文件的行):如何使用akka-http將未來[源] [[]]編組爲一個HttpResponse?

def get(url: String, charset: String, delimiter: Char, quote: Char, escape: Char) = { 
    val scanner = CsvParsing.lineScanner(
     delimiter.toByte, 
     quote.toByte, 
     escape.toByte 
    ) 

    val request = HttpRequest(GET, Uri(url)).withHeaders(`User-Agent`(UserAgent)) 

    Http(system) 
     .singleRequest(request) 
     .map { response => 
      response.entity.withoutSizeLimit.dataBytes 
       .viaMat(scanner)(Keep.left) 
       .map(row => 
        row.map(bs => 
         bs.decodeString(charset) 
        ) 
       ) 
       .take(2) 
     } 
} 

返回Future被傳遞給complete,其編組到陣列的JSON陣列使用:

implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 

不過,我想檢查response和返回不同的HttpResponse如果它不是一個200好像要做到這一點是將Future[Source[...]]元帥在這一個HttpResponse的最佳方式方法,然後返回類型爲HttpResponse

我該怎麼做?或者,還有更好的方法?

回答

0

好的,所以我最終用不同的方法到達了那裏。

Http(system).singleRequest(request) 
    .flatMap { response => 
     response.status match { 
      case StatusCodes.OK => 
       val compression = CompressionChooser.choose(url, gzip, response) 
       response.entity.withoutSizeLimit.dataBytes 
        .via(compression.decoder.decoderFlow) 
        .viaMat(scanner)(Keep.left) 
        .map(_.map(_.decodeString(charset))) 
        .take(2) 
        .runWith(Sink.seq) 
        .map { rows => 
         val json = Json.toJson(rows) 
         HttpResponse(
          StatusCodes.OK, 
          entity = HttpEntity(ContentTypes.`application/json`, json.toString) 
         ) 
        } 

      case _ => Future successful HttpResponse(StatusCodes.BadRequest, entity = "Error") 
     } 
    } 
相關問題