akka-stream

    2熱度

    2回答

    如果我有一個來自cassandra查詢的較大結果集。 有沒有辦法讓這個結果集成爲Akka-Stream的來源,這樣我可以處理每一行? 感謝

    0熱度

    1回答

    嗨我正在嘗試處理文件中的數據。 這是我在下面使用的代碼。 我有一份期貨清單,並試圖從這些期貨中獲得產出。 一切都很好,但返回的最後一行在OnSuccess之前執行。 如何在沒有阻止操作的情況下更改該行爲。 def processRow(rowNumber: Int, row: String, delimiter: String, rules: List[Rule]): RowMessage = {

    5熱度

    1回答

    我的應用程序有一個Akka-Websocket接口。 Web套接字由一個actor-subscriber和一個actor actor組成。用戶通過將命令發送給相應的演員來處理命令。發佈者在事件流上進行偵聽並將更新信息發佈迴流(最終發佈到客戶端)。這很好。 我的問題:訂閱者如何將事件發送迴流?例如確認接收到的命令的執行。 public class WebSocketApp extends HttpA

    0熱度

    1回答

    Akka Http中的Web Socket連接被視爲Akka Streams Flow。這似乎對於基本的請求回覆非常有效,但是當消息也應該通過WebSocket發送時,它會變得更加複雜。我的服務器的核心看起來有點像: lazy val authSuccessMessage = Source.fromFuture(someApiCall) lazy val messageFlow = reque

    8熱度

    1回答

    我正在嘗試將基於akka流的流程整合到我的Play 2.5應用程序中。這個想法是,您可以在照片中進行流式處理,然後將其寫入磁盤作爲原始文件,縮略圖版本和水印版本。 我設法使用圖形像這樣得到這個工作:使用蓄電池這樣 val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder

    2熱度

    1回答

    我開始學習Akka Stream。我有簡化爲這是一個問題: import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ClosedShape} import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source} object Test exte

    3熱度

    1回答

    是否可以將Outlet[A]分爲FlowOps[A, _]?也就是說,如果我有這樣的: import akka.NotUsed import akka.stream.Outlet import akka.stream.scaladsl.{FlowOps, GraphDSL, Source} def filter(in: Outlet[Double]) (implicit b:

    2熱度

    1回答

    我在Akka Stream中瞭解到,一個插座必須連接到一個插座,並且沒有自動支持將多個接收器連接到相同的信號源。所以你必須插入中間對象,如Broadcast。 我正在將一個信號處理DAG轉換爲一個Akka Stream圖,如果我可以在通過遍歷發現它們時動態地添加接收器,它將會對我有很大的幫助。如果我有自定義GraphStage,我可以在Graph.create階段有我自己的Shape的outlet

    5熱度

    1回答

    如果我已經創造了阿卡流的RunningGraph,我怎麼能知道(從外部) 當所有的節點都由於取消了完成? 當所有節點由於錯誤而被停止時?

    4熱度

    2回答

    我試圖解決akka流的問題,我希望我的初始生產者可以連續獲取隊列/ mongodb集合,並在配置的時間間隔內完成它。常見的做法來實現呢?