我有一個很重的用戶數據流。我想通過它的id來確定這是否是新用戶。爲了減少對數據庫的調用,我寧願在先前用戶的內存中維護一個狀態。 val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink:
我試圖通過設置Kafka服務器並使用生產者發送消息來在本地測試我的代碼,但我想知道是否有一種方法可以爲此編寫單元測試一段代碼(測試消費者收到的消息是否正確)。 val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootst
我是Akka/Scala的新手,正在嘗試調試下面的代碼。當resultSetParser有一個例外時,它不會拋出它。相反,使用此代碼的服務只是永遠閒置。 如何讓我的服務拋出異常,而不僅僅是在流中等待?在Akka中有沒有類似watchException()的函數,我可以在watchTermination()之後調用它,使它在處理流時看到異常? val chunkSource: Source[Chun