akka-stream

    1熱度

    1回答

    我正在玩Akka Streams和使用Alpakka從文件流式傳輸內容。我需要在一段時間後停止流,所以我想使用KillSwitc h。但我不知道如何使用它,因爲我正在使用圖形DSL。 My圖表看起來是這樣的: val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotU

    0熱度

    2回答

    我與akka試驗,以及(文檔以下),我有以下代碼: Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi")); Timeout askTimeout = Timeout.apply(5, TimeUnit.SECONDS); words .mapAsy

    1熱度

    1回答

    我正在嘗試使用Akka Streams,以便準確理解人們應該如何使用TCP Server從客戶端接收的內容(服務器不需要響應客戶端)。 這裏有一個標準的TCP服務器實現(申請後我從@海科-澤貝格爾的簡潔解釋here理解): def runServer(system: ActorSystem, address: String, port: Int, collectingSink: Sink[Byte

    5熱度

    1回答

    當在流流定義使用groupBy與n一些最大容量: source.groupBy(Int.MaxValue, _.key).to(Sink.actorRef) 如果我掛鉤導致說子流,一個演員水槽,並有目的地使所述子流終止於一些消息,將釋放容量爲groupBy?如果一個子流由接收器結束,是否會從n變爲n-1回到n?這是建立動態圖形的可行方法嗎?

    3熱度

    1回答

    我想測試一些Akka流功能,如conflate。爲此,我需要在簡單的單元測試中構建一個不受背壓影響的源。天真的嘗試,如 Source.tick(1.milli, 1.milli, "tick").map(_ => Random.nextDouble()) 由於背壓不起作用。 OTOH通過HTTP可能是矯枉過正。 如何創建一個簡單的Source對於不受背壓影響的單元測試?

    1熱度

    1回答

    在Play 2.5應用程序中,我需要創建一個服務,在一個事務中從數據庫中讀取大量數據,並將其作爲HTTP響應發送到客戶端。 我不想使用背壓,因爲網絡速度太慢會導致用於從數據庫獲取數據的數據庫連接將被使用太長時間。 我的當前實現首先將數據提取到臨時緩衝區(內存或文件,如果數據太多),釋放數據庫連接並返回帶有數據的Ok響應。 這樣做的缺點是,當從數據庫中完全提取數據時,數據發送到客戶端時首先啓動。 我

    1熱度

    2回答

    當我將生產者設置放入代碼中時,我一次又一次地遇到問題。當我沒有它時,一切正常。下面給出了它包含所有代碼的文件單個文件,我試圖將一個文件寫入一個kafka流。並得到這個編譯錯誤。 package somePackage import java.nio.file.Paths import akka.Done import akka.actor.{Actor, ActorLogging, Ac

    1熱度

    1回答

    我有一個圖形接受一系列文件,逐個處理它們,然後在執行結束時,如果所有執行成功或失敗,程序應該返回成功(0)或失敗(-1)。 這最後一步如何實現?水槽怎麼知道它何時收到最後一個文件的結果? val graph = createGraph("path-to-list-of-files") val result = graph.run() def createGraph(fileOrPath: S

    0熱度

    1回答

    我不能得到以下代碼工作,它編譯,但我得到錯誤Expected OnNext(_), yet no element signaled during 3 seconds。 爲什麼我得到這個錯誤,我需要做什麼來測試下面的流程類型 ? class GeneralTests extends FunSuite { implicit val system = ActorSystem("Test-Sy

    5熱度

    1回答

    鑑於我有一段很長時間的事件流,流經如下所示的事件。當很長一段時間過去後,將會有很多子流被創建而不再需要。 有一種方法來清理一個特定子在給定時間,對於 例如通過ID 3中創建的子應清洗並在13Pm失去了掃描方法的狀態 (到期婦女參與發展的屬性) ? case class Wid(id: Int, v: String, expires: LocalDateTime) test("Substream