akka-stream

    3熱度

    1回答

    我想用Akka HTTP建立一個REST服務,連接到現有的宿(使用Kafka反應流),但我無法弄清楚如何將HTTP流鏈接到Akka流接收器。 我應該選擇使用Flow的低級Akka HTTP API嗎? 我的要求是有: 背壓的完整流程 200響應代碼時,所有事情都是由卡夫卡承認下沉 500時,背壓過高?可能嗎 ? 這裏是我的代碼當前代碼 // flow to split group of lines

    0熱度

    1回答

    阿卡流我有一個流是 偵聽HTTP後接收事件的 列表mapconcat事件的流元素 轉換事件卡夫卡記錄列表 產生與反應性卡夫卡(akka流卡夫卡生產者水槽)的記錄 這裏是簡化代碼 // flow to split group of lines into lines val splitLines = Flow[List[Evt]].mapConcat(list=>list) // sin

    1熱度

    1回答

    我想使用reactivemongo-akkastream 0.12.1從Mongodb流式傳輸數據,並將結果返回到其中一條路徑(使用Akka-http)的CSV流中。 我沒有實現,這裏繼爲例: http://doc.akka.io/docs/akka-http/10.0.0/scala/http/routing-dsl/source-streaming-support.html#simple-cs

    0熱度

    1回答

    如何在下面的代碼返回concurrent.Future: val nextMeetup = ws.url(url).stream() .flatMap(response => response.body .via(framing) .map(_.utf8String) .map(_ + "\n") ) 類型不匹配錯誤: 發現:a

    1熱度

    1回答

    也似乎並沒有爲我工作。物品不會陷入其中。這是我的。 val merged: Source[ArticleWithKeywords, _] = ... val (ks, fut) = merged .alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAns

    0熱度

    1回答

    我實際上使用的是Scala,但這個問題對所有的Rx和流式框架都是通用的。我的用例是我有一個生成的observable(因此冷),我想多個消費者並行使用完全相同的值,但我希望他們有顯着不同的吞吐量。 我需要的可以通過廣播一個可重放的observable來完成,但是我發現帶有最大緩衝區大小的重播策略是從緩衝區中溢出元素(然後爲最慢的消費者丟失)而不是反壓制片商。如果你把所有廣播的觀察對象視爲熱點,這是

    1熱度

    2回答

    我正在使用Akka Streams Kafka將卡夫卡消息傳遞給遠程服務。我希望保證該服務每次只收到一條消息(至少一次,最多一次發送)。 這是我想出了代碼: private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef, topicPattern: Str

    0熱度

    1回答

    我可能錯過了卡夫卡消費者的角度,但我想要做的是: Consumer訂閱一個主題,抓住題目中的所有信息,並返回一個未來所有這些郵件列表 我已經寫了嘗試,並做到這一點的代碼是 val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) => list :+ ka

    2熱度

    2回答

    Reading a CSV files using Akka Streams - 基於此問題。 我使用Akka Streams讀取CSV。 現在我需要逐行處理它 - 但我也需要知道標題的名稱是什麼。 任何選項? UPD。澄清一下。 FileIO.fromPath(Paths.get("a.csv)) .via(Framing.delimiter(ByteString("\n"), 256, tr

    1熱度

    2回答

    我想在Akka Stream中實現自定義Source[ByteSting]。該源應該從提供的文件中讀取數據並在所提供的字節範圍內並將其傳播到下游。 起初我想,這可以通過實現在ActorPublisher中混合的Actor來完成。這種實現類似於akka.stream.impl.io.FilePublisher其內容從提供的路徑整個文件,而不是僅僅從給定的字節數據的範圍: import java.ni