2017-02-27 79 views
3

我想用Akka HTTP建立一個REST服務,連接到現有的宿(使用Kafka反應流),但我無法弄清楚如何將HTTP流鏈接到Akka流接收器。將Akka HTTP連接到Akka流

我應該選擇使用Flow的低級Akka HTTP API嗎?

我的要求是有:

  • 背壓的完整流程
  • 200響應代碼時,所有事情都是由卡夫卡承認下沉
  • 500時,背壓過高?可能嗎 ?

這裏是我的代碼當前代碼

// flow to split group of lines into lines 
    val splitLines = Flow[String].mapConcat(_.split("\n").toList) 

// sink to produce kafka records in kafka 
val kafkaSink = Flow[String] 
    .map(new ProducerRecord[Array[Byte], String](topic, _)) 
    .toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right) 

val routes = { 
    path("ingest") { 
     post { 
     logger.info("starting ingestion") 
     entity(as[GenericEvent]) { eventIngest => 
      ????  
     }~ 
     entity(as[GenericEventList]) { eventIngestList => 
      ???? 
     } 
     } 
    } 
    } 

Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort) 

回答

2

還有的要對此有幾種方法。一個建議可能是將數據直接從您的請求實體傳輸到您的kafka接收器中。 extractDataBytes指令可以幫助你做到這一點(更多信息here)。

嘗試一下下面的例子。我添加了???流程以允許您的特定於案例的轉換來正確拆分/轉換您的請求實體字節。您可以使用類似Framing.delimiter來分割實體字節流(更多信息here)。

(extractDataBytes & extractMaterializer) { (byteSrc, mat) => 
    val f = byteSrc.via(???).runWith(kafkaSink)(mat) 
    onComplete(f){ 
     case Success(value) => complete(s"OK") 
     case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
    } 
    } 

另外,如果你想解組的實體一些領域對象,你可以這樣做

(entity(as[Event]) & extractMaterializer) { (event, mat) => 
    val f = Source.single(event).via(???).runWith(kafkaSink)(mat) 
    onComplete(f){ 
     case Success(value) => complete(s"OK") 
     case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
    } 
    } 

來你的最後一個問題,應該卡夫卡背壓,你流將永遠不會完成。您應該會在服務器給你回500所配置的請求超時後(引用下面的文檔):

一個默認請求超時全局應用到所有路線和可 配置爲使用akka.http .server.request-timeout設置(其中 默認爲20秒)。

+0

是否有任何方式解除與實體流(作爲[事件])? – vgkowski

+0

是的,你可以隨時對卡夫卡水槽進行解組活動。我在回答中加入了另一個例子 –

+0

這是Akka HTTP流程中的一個新流程,不會引入一些無用的開銷? – vgkowski