我試圖做一些感覺應該是直截了當的事情,但證明出人意料的困難。RxJS使用異步訂戶功能的Observable
我有一個訂閱RabbitMQ隊列的函數。具體來說,這裏是Channel.consume函數:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
它返回一個承諾,該承諾用訂閱ID解析 - 稍後需要取消訂閱 - 還有一個回調參數,用於在消息從隊列中拉出時調用。
當我想取消訂閱隊列時,我需要使用Channel.cancel函數在這裏取消使用者:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。這需要先前返回的訂閱ID。
我想將所有這些東西包裝在觀察者訂閱時訂閱隊列的Observable中,並在observable退訂時取消訂閱。然而,由於調用的「雙重異步」性質(我的意思是說它們同時具有回調和返回承諾),這證明有點困難。
理想情況下,我想能夠編寫的代碼是:
return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async() => {
await channel.cancel(consumeResult.consumerTag);
};
});
然而,這是不可能的,因爲此構造不支持異步訂戶功能或拆除邏輯。
我一直無法找出這一個。我在這裏錯過了什麼嗎?爲什麼這麼難?
乾杯, 亞歷
感謝您的迴應。在提出問題之前,我也考慮了你的建議,但我的問題是沒有任何東西等待渠道的許諾。所以,假設對channel.cancel的調用僅在3秒鐘後解析。有可能在該頻道上收到新消息,但Rx觀察者已經被取消訂閱,因此這些消息將丟失到以太網中。這是我想要避免的。你有什麼建議來解決這個問題嗎? – AlexC
我不明白這是一個問題。根據[observable contract]中的*訂閱和取消訂閱*部分(http://reactivex.io/documentation/contract.html):*當觀察者向Observable發出取消訂閱通知時,Observable將嘗試停止發佈通知觀察員。然而,觀察者在發出退訂通知後,Observable將不會向觀察者發出通知。* – cartant
因此,如果通道持續抽出消息直到取消解析,觀察者應該接收它們。也就是說,觀察者的實現不應該期待沒有進一步的消息被髮布。 – cartant