akka-stream

    1熱度

    1回答

    我下面this answer 創建使用阿卡流的SQS消費者: def queryForMessages = { val messages = Sqs.receive(queueUrl, 3, 10) println(s"Received from sqs: ${messages.map(_.getBody)}") messages } def m

    0熱度

    1回答

    更新:我把我的問題在test project來解釋我的意思詳細 ============== ================================================== ===== 我有一個Akka源contiune從數據庫表中讀取,groupby一些關鍵然後減少它。但是,在我應用reduce函數後,似乎數據永遠不會發送到sink,因爲上游始終有數據到來,所以它會保持減少

    0熱度

    1回答

    有兩個表TableA和TableB。 我需要一些記錄複製從TableA到TableB。我用slick-3.0,並使用以下方法: import akka.stream._ import akka.stream.scaladsl._ ... //{{ READ DATA FROM TABLE A val q = TableA.filter(somePredicate).result val

    0熱度

    1回答

    我想有條件地在一個值(非物化)Source上加上一個值。我應該怎麼做? val src: Source[_,NotUsed] = ??? Source.combine(Source.single(???), src) 的Source.combine文檔提到 使用給定的策略,例如合併或CONCAT ,但不提供關於選擇的concat策略的例子。

    0熱度

    1回答

    我第一次使用Akka Streams Testkit,並沒有找到一個很好的模式來測試一個流而不是在時間窗口內產生一個值。 這工作: intercept[AssertionError] { // '.expectNext' throws this src.request(1) .expectNext(100 millis) // expect no entries in ...

    0熱度

    2回答

    我試圖用阿卡流流的文件和正在運行到一個小問題,提取流的結果爲未來[字符串]: def streamMigrationFile(source: Source[ByteString, _]): Future[String] = { var fileString = "" val sink = Sink.foreach[ByteString](byteString => fileSt

    2熱度

    1回答

    我用阿卡流「ActorPublisher演員作爲流每個連接的數據Source發送到傳入的WebSocket或HTTP連接。 ActorPublisher的contract是定期通過提供需求請求數據 - 下游可接受的元素數量。如果需求爲0,我不應該發送更多元素。我觀察到,如果我緩衝元素,當消費者速度緩慢時,緩衝區大小在1到60之間波動,但大多數在40-50之間。 要流我使用阿卡-HTTP「s到的We

    1熱度

    4回答

    有一個整數的一些流: val source = Source(List(1,2,3,4,5)) 是否有可能獲得從源頭上(count, sum)結果?對於上面的例子,它將是(5, 15)。 我想我應該用流量,並結合他們: val countFlow = Flow[Int].fold(0)((c, _) => c + 1) val sumFlow = Flow[Int].fold(0)((s, e)

    2熱度

    1回答

    我已閱讀Akka streams materialization concept,並理解流物化是: ,以運行服用流描述(圖),並分配它需要的所有必要資源的過程。 我跟着一個例子,使用mapMaterializedValue構建我的akka​​流,將消息發送到隊列。代碼的目的是推動信息流藍圖後,排隊已經建立和代碼工作,但我真的不明白是什麼mapMaterrializaedValue代碼做: Prom

    0熱度

    1回答

    我有一個工作akka-http應用程序。現在我嘗試通過slf4j和logback添加日誌記錄以及我的應用程序崩潰。 build.sbt libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.0.7", "ch.qos.logback" % "logback-classic" % "1.2.3",