akka-stream

    0熱度

    1回答

    我有一個很重的用戶數據流。我想通過它的id來確定這是否是新用戶。爲了減少對數據庫的調用,我寧願在先前用戶的內存中維護一個狀態。 val users = mutable.set[String]() //init the state from db user = db.getAllUsersIds() val source: Source[User, NotUsed] val dbSink:

    2熱度

    1回答

    我試圖通過設置Kafka服務器並使用生產者發送消息來在本地測試我的代碼,但我想知道是否有一種方法可以爲此編寫單元測試一段代碼(測試消費者收到的消息是否正確)。 val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) .withBootst

    0熱度

    1回答

    我使用Slick 3和Akka Streams從mysql中傳輸數據。 這是我建立我的源 import slick.jdbc.MySQLProfile.api._ val enableJdbcStreaming: (java.sql.Statement) => Unit = {statement => if (statement.isWrapperFor(classOf[com.mys

    0熱度

    1回答

    的文檔FlowOps#concat指出 ...源一起使用這個流程化,並從生產,直到它的時間來斷言背壓元素只是不停。 換句話說,如果我有 sourceA.concat(sourceB) 然後sourceB將在同一時間sourceA物化。 我如何推遲sourceB正在實現,直到sourceA完成? 我正在使用Akka 2.5.6。

    1熱度

    1回答

    我有一個場景,我開始使用alpakka多個jmsSource(對於不同的隊列)。我還需要在任何時候卸下隊列。所以我已經添加KillSwitch到jms阿卡流,如下所示: - trait MessageListener { lazy val jmsPipeline = jmsSource .map { x => log.info(s"Received message ${x}

    4熱度

    1回答

    我是Akka/Scala的新手,正在嘗試調試下面的代碼。當resultSetParser有一個例外時,它不會拋出它。相反,使用此代碼的服務只是永遠閒置。 如何讓我的服務拋出異常,而不僅僅是在流中等待?在Akka中有沒有類似watchException()的函數,我可以在watchTermination()之後調用它,使它在處理流時看到異常? val chunkSource: Source[Chun

    1熱度

    1回答

    目的是從數據庫流數據,在這個組塊數據的執行一些計算(該計算返回一些情況下類的未來)發送分塊響應,併發送該數據作爲分塊的響應給用戶。目前,我能夠流式傳輸數據併發送響應,而無需執行任何計算。但是,我無法執行此計算,然後傳輸結果。 這是我實施的路線。 def streamingDB1 = path("streaming-db1") { get { val src = Source.

    0熱度

    1回答

    我正在使用Scala 2.11和Akka Streams Kafka 0.17。 我有一個流其中: 甲Source使用Source.actorRef創建。在這裏,演員計劃以固定間隔運行並持續生成消息,並將消息發送到流中。 我已附加Producer作爲Flow。製片人推動ProducerMessage.Message成爲卡夫卡話題。 一些數據庫操作。 我有一個問題,同時構建ProducerMessa

    0熱度

    1回答

    閱讀akka流的文檔,我不太清楚諸如消息順序之類的東西,以及我是否可以執行它。讓我用我爲聊天服務器編寫的一小段代碼來設置我的問題的上下文。 def flowShape(user: User) = GraphDSL .create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) { impl

    0熱度

    1回答

    使用VisualVM來配置使用akka-streams-kafka的應用程序。 它顯示了很多卡夫卡協調阻塞線程 誰是這些協調員? 我也有三個卡夫卡消費者,也阻塞線程 我需要爲他們創造一個獨立的執行上下文?