reactive-kafka

    2熱度

    1回答

    我收到來自Kafka反應流消費者的Bytestring文件;我想用這個Bytestring構造一個akka-http請求作爲實體HttpEntity.Default。 HttpEntity.Default需要Source [Bytestring,Any]作爲其參數之一。 什麼是連接兩者的最佳方式?

    0熱度

    1回答

    我想並行化寫入kafka,即有多個生產者將數據發送到kafka,儘管它來自akka流。在其他我的流中有幾個從源頭開始的階段,然後在發送數據時,我希望有大約16位工作人員同時發送數據。 我想知道是否需要在阿卡流圖DSL中嵌入Akka Streams Kafka併爲此使用平衡器,或者如果有更簡單的解決方案。另外,簡單地說,如果有人做了這樣的事情總的來說會很棒。

    0熱度

    1回答

    使用Kafka發送大型文件時,是否可以將其分配到分區中,然後使用Akka-Stream重新組裝? http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297

    0熱度

    1回答

    我試圖實現簡單的服務,將來自kafka的消息包裝在一些數據中並將其發送到外部服務。 處理消息時處理外部服務不可用的常見模式是什麼? 到目前爲止,我只在外部服務請求成功時手動提交消息。如果沒有提交,我希望kafka在一段時間後重新發送消息,以便處理外部服務失敗對消費者而言是透明的。儘管如此,我找不到一種方法。 但我很好奇,如果我沒有做一些反模式,並有更好的解決方案。

    0熱度

    1回答

    我正在使用Scala 2.11和Akka Streams Kafka 0.17。 我有一個流其中: 甲Source使用Source.actorRef創建。在這裏,演員計劃以固定間隔運行並持續生成消息,並將消息發送到流中。 我已附加Producer作爲Flow。製片人推動ProducerMessage.Message成爲卡夫卡話題。 一些數據庫操作。 我有一個問題,同時構建ProducerMessa

    2熱度

    1回答

    我正在運行一個Akka Streams Reactive Kafka應用程序,它應該在重負載下正常工作。運行該應用程序大約10分鐘後,該應用程序將以OutOfMemoryError停機。我試圖調試堆轉儲,發現akka.dispatch.Dispatcher正在佔用〜5GB的內存。以下是我的配置文件。 阿卡版本:2.4.18 反應卡夫卡版本:2.4.18 1. application.conf: c

    0熱度

    1回答

    我看着下列文件:https://github.com/akka/reactive-kafka,我看到了下面的代碼片段: implicit val actorSystem = ActorSystem("ReactiveKafka") implicit val materializer = ActorMaterializer() val kafka = new ReactiveKafka()

    0熱度

    1回答

    我是新的使用aka流kafka(和akka流一般)。我正在嘗試構建一個圖表,以便將消息發佈到不同的主題。 如何將生產者作爲流連接以提交處理後的消息?我試着用Producer.flow但由於您使用的是GraphDSL我不能得到commitScaladsl object TestFoo { import akka.kafka.ProducerMessage.Message impl

    0熱度

    1回答

    我開發反應卡夫卡在我們的遊戲斯卡拉項目,在我們創建的5個主題,由消費者團體訂閱和工作好,現在的問題是我創建了一個新的話題,如何我可以將此主題添加到現有的消費羣(是可能) 我的代碼是: val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) .wi