2017-12-18 196 views
1

我有一個場景,我開始使用alpakka多個jmsSource(對於不同的隊列)。我還需要在任何時候卸下隊列。所以我已經添加KillSwitch到jms阿卡流,如下所示: -Akka Streams KillSwitch在alpakka jms

trait MessageListener { 

    lazy val jmsPipeline = jmsSource 
    .map { x => log.info(s"Received message ${x} from ${queue}"); x } 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) }) 
    (Keep.both) 
    .run() 

    def start(): Unit = { 
      log.info("Invoking listener : {}", queue) 
      jmsPipeline 
      log.info("listener : {} started", queue) 
      } 
    def stop():Unit =  jmsPipeline._1.shutdown() 

    def queue: String 

} 

object ListenerA extends MessageListener { 
    override def queue: String = "Queue_A" 
} 

object ListenerB extends MessageListener { 
    override def queue: String = "Queue_B" 
} 

..等等。

啓動應用程序後,所有的隊列連接並正常工作。但是,當我嘗試使用停止方法分離隊列時,並非所有隊列都斷開連接並且行爲是隨機的。我還檢查了killSwitch對所有聽衆都不同。

有人可以告訴我這裏有什麼問題嗎?

回答

0

您的日誌支持您連接到具有不同流的多個隊列的錯覺,但是您有多個可能連接到同一隊列的流。在這兩個監聽器對象中,記錄器都會記錄覆蓋的queue名稱,但該隊列名稱不用於配置jmsSource

您沒有顯示jmsSource的定義;顯然它是在MessageListener性狀之外的某處定義的,在這種情況下,ListenerAListenerB都使用相同的jmsSource。換言之,而ListenerAListenerB具有jmsPipeline不同實例(這就是爲什麼殺開關是不同的),這兩個jmsPipeline實例由相同jmsSource實例衍生(除非jmsSourcedef即在每次調用創建一個不同的Source ,但即使情況如此,基本問題仍然存在:queue未在配置中使用)。

在Alpakka,JMS隊列上JmsSourceSettings配置,所以jmsSource可能看起來像下面這樣:

val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue") 
)      // the queue is configured here^

ListenerA.start(),例如,被調用時,以下記錄:

Invoking listener : Queue_A 
listener : Queue_A started 

再次,在以上日誌語句中的"Queue_A"ListenerA中被重寫的def queue: String成員的值;它不一定是在jmsSource(上例中的"MyQueue")中實際配置的隊列。與ListenerB以及您在map組合器中登錄的消息一樣。

一個簡單的解決方法是把的jmsSource及其JmsSourceSettings的定義MessageListener特質內部和這些設置實際使用queue

相關問題