我有一個場景,我開始使用alpakka多個jmsSource(對於不同的隊列)。我還需要在任何時候卸下隊列。所以我已經添加KillSwitch到jms阿卡流,如下所示: -Akka Streams KillSwitch在alpakka jms
trait MessageListener {
lazy val jmsPipeline = jmsSource
.map { x => log.info(s"Received message ${x} from ${queue}"); x }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) })
(Keep.both)
.run()
def start(): Unit = {
log.info("Invoking listener : {}", queue)
jmsPipeline
log.info("listener : {} started", queue)
}
def stop():Unit = jmsPipeline._1.shutdown()
def queue: String
}
object ListenerA extends MessageListener {
override def queue: String = "Queue_A"
}
object ListenerB extends MessageListener {
override def queue: String = "Queue_B"
}
..等等。
啓動應用程序後,所有的隊列連接並正常工作。但是,當我嘗試使用停止方法分離隊列時,並非所有隊列都斷開連接並且行爲是隨機的。我還檢查了killSwitch對所有聽衆都不同。
有人可以告訴我這裏有什麼問題嗎?