akka-stream

    0熱度

    1回答

    我正在嘗試編寫Akka流圖。我寫的代碼是 val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder => (sink1, sink2) => import GraphDSL.Implicits._ val bcast = builder.

    3熱度

    1回答

    下面是使用最簡單的圖形中的Partition和Merge我能想出的,但運行時,它提供了以下錯誤: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1] 據我所知

    0熱度

    1回答

    我想弄清楚如何處理在你的一個階段中你需要進行一個返回InputStream的調用,在那裏我將處理該流作爲舞臺的來源進一步下降。 例如 Source.map(e => Calls that return an InputStream) .via(processingFlow).runwith(sink.ignore) 我想該元素將處理流程那些從InputStream到來。這基本上是我拖尾一個文

    0熱度

    1回答

    我想在Scala中實現外部合併排序。它用於排序整個主內存中不適合的大文件。 詳細信息可以在這裏找到: - How external merge sort algorithm works? 現在,我需要閱讀的文件塊,排序,並將其寫入到磁盤等等等等 什麼是閱讀的最地道的/功能性的方式/寫一個大文件的部分? 如果我使用'Source.fromFile(filename).getLines'方法,我知道我

    0熱度

    2回答

    我已經使用monix和akka-streams將List [ClassA]映射到List [ClassB]的基準,但我不明白它爲什麼如此緩慢。 我嘗試了不同的方法來映射,這裏是與江鈴控股的結果: [info] Benchmark Mode Cnt Score Error Units [info] MappingBenchmark.akkaLoadBalanceMap ss 2

    0熱度

    1回答

    我試圖解碼Marc21二進制數據記錄,它具有關於提供記錄長度的字段的以下規範。 A計算機生成的五個字符的數字,等於整個記錄的長度,包括其本身和記錄終止符。數字 是正確的,未使用的位置包含零。 我想使用 阿卡流Framing.lengthField,但我只是不知道如何指定字段的大小。我想象一個角色是8位,也許是16位,我不確定,我想知道是否依賴於平臺或語言。總之,問題是可以說出該領域的規模是什麼知道

    0熱度

    1回答

    我正在嘗試創建一個可以通過類似Iterator的東西來消費的流。 我正在實現一個公開類似於迭代器的接口的庫,所以這對我來說是最簡單的東西。 我目前設計的圖形本質上是Source<Iterator<DataRow>>。有一件事我看到到目前爲止是將其壓扁到Source<DataRow>然後用後跟https://docs.oracle.com/javase/8/docs/api/java/util/st

    0熱度

    1回答

    我想聽聽使用阿卡流SQS阿卡流的狀態,我得到的消息從它的Q 使用此代碼段: implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ConfigUt

    1熱度

    1回答

    我有一個在雲中運行某處的Kafka Broker,當我試圖通過命令行消費者工具使用它時,我可以使用消息。但是,當我將相同的端點放在我的akka​​-stream卡巴消費者設置中時,它不起作用。 例如: - bin/kafka-console-consumer.sh --zookeeper主機名:2181/xxx/yyy - 主題名稱 這對我有用。但是當我通過ConsumerSetting做同樣的事

    0熱度

    1回答

    在akka-http websocket應用程序中,我有一條Route迴應給定的消息,並且我還需要在應用程序中維護狀態。所以下面的工作正常: override protected def routes: Route = pathSuffix("echo") { handleWebSocketMessages(echoMessageFlow) } def