3
我想用Akka HTTP建立一個REST服務,連接到現有的宿(使用Kafka反應流),但我無法弄清楚如何將HTTP流鏈接到Akka流接收器。將Akka HTTP連接到Akka流
我應該選擇使用Flow的低級Akka HTTP API嗎?
我的要求是有:
- 背壓的完整流程
- 200響應代碼時,所有事情都是由卡夫卡承認下沉
- 500時,背壓過高?可能嗎 ?
這裏是我的代碼當前代碼
// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)
// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)
val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}
Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)
是否有任何方式解除與實體流(作爲[事件])? – vgkowski
是的,你可以隨時對卡夫卡水槽進行解組活動。我在回答中加入了另一個例子 –
這是Akka HTTP流程中的一個新流程,不會引入一些無用的開銷? – vgkowski