akka-stream

    0熱度

    2回答

    在使用Akka Streams時,似乎我永遠無法獲得錯誤處理權限。 所以這是我的代碼 var db = Database.forConfig("oracle") var mysqlDb = Database.forConfig("mysql_read") var mysqlDbWrite = Database.forConfig("mysql_write") implicit val ac

    4熱度

    1回答

    TLDR:是更好兌現每個請求一個流(即使用短壽命流)或使用跨請求單個流物化,當我有傳出http請求作爲一部分流的? 詳細信息:我有一個典型的服務,需要一個HTTP請求時,它驅散幾個第三方服務的下游(不是由我控制),然後送回前彙總的結果。我正在使用akka-http進行客戶端實施並噴灑服務器(遺留下來,隨着時間的推移將會轉移到akka-http)。示意性地: request -> map -1-*-

    2熱度

    1回答

    我正在運行一個Akka Streams Reactive Kafka應用程序,它應該在重負載下正常工作。運行該應用程序大約10分鐘後,該應用程序將以OutOfMemoryError停機。我試圖調試堆轉儲,發現akka.dispatch.Dispatcher正在佔用〜5GB的內存。以下是我的配置文件。 阿卡版本:2.4.18 反應卡夫卡版本:2.4.18 1. application.conf: c

    1熱度

    1回答

    我有一個來源,分組元素和一個水槽,使批量請求, 我使用KillSwitch能夠關閉圖形在某些任意時間點。 ,最新一批不完整的記錄中源輸出得到時​​被稱爲 val source = Source.tick(10.millis, 10.millis, "tick").grouped(500) val (switch, _) = source.viaMat(KillSwitches.single)(

    2熱度

    1回答

    我有以下方法,它返回一個Future[Source[List[String]]](前兩個CSV文件的行): def get(url: String, charset: String, delimiter: Char, quote: Char, escape: Char) = { val scanner = CsvParsing.lineScanner( delimiter.to

    1熱度

    1回答

    我是Akka Streams的新手,我有一個問題。 所以我可以從服務器操作和處理數據的一些客戶(從下面的官方文檔的源代碼)。 private static final ActorSystem system = ActorSystem.create("Client"); private static final Materializer materializer = ActorMaterializ

    0熱度

    1回答

    我運行一個阿卡流卡夫卡應用程序,我想結合的流消費者的監督策略,例如,如果經紀人下降,流消費者停止超時後死亡,主管可以重新啓動消費者。 這裏是我的完整代碼: UserEventStream: import akka.actor.{Actor, PoisonPill, Props} import akka.kafka.{ConsumerSettings, Subscriptions} import

    0熱度

    1回答

    我在8080端口上它完美,並顯示以下消息在本地運行一個簡單的阿卡HTTP服務器: Started server at 127.0.0.1:8080, press enter to kill server 我使用sbt-assembly創建的.jar文件。它生活在target/scala-2.12/my-app-assembly-0.1.jar 然後,我創建了一個簡單的Dockerfile,如:

    0熱度

    1回答

    我是新來的阿卡流。我從github運行下面的例子。但是,向「Helloer」actor發送的消息不會在輸出控制檯中接收和顯示。 StreamingApp.scala import _root_.akka.actor.{ Actor, Props } import org.apache.spark._ import org.apache.spark.streaming._ import org

    3熱度

    2回答

    我有一個包含20萬個用戶的500,000個元素和一個隊列。信息以不同的速度處理(1,15,30,60秒,3,50分鐘,3,16小時或更長時間,24小時爲超時)。我需要消費者的迴應,以便對數據進行一些處理。我將爲此和基於事件的onComplete使用Scala Future。 爲了不淹沒隊列,我想發送前30條消息到隊列:消費者將選擇20條消息,並且10條隊列將在隊列中等待。當其中一個Future s