2017-04-18 32 views
0

此問題與​​有關。rxpy將物品注入可觀察區域

我想構建一個處理來自可觀察源消息的反應系統。除此之外,我正試圖將它與基於zookeeper的領導者選舉系統相結合。

這種組合只允許一個進程場中的一個領導者處理消息流。以下是我正在嘗試構建的代碼的要點。

# event_source is an observable of messages 
# manager.leaders is an observable of leader election events 
# manager.followers is an observable of leader relinquish events 
event_source\ 
    .skip_until(manager.leaders)\ 
    .take_until(manager.followers)\ 
    .subscribe(observer) 

它工作正常和所有的,但我需要skip_untiltake_until一塊來處理回填之間注入。這是爲了解決領導者流程失敗與假設領導的另一個流程之間的潛在差距。每個處理過的消息都會留下一條記錄,以便新領導者在繼續處理流之前可以趕上丟失的消息(如果有的話)。

我試過start_with運營商沒有成功。我是不是以一種不適合使用的方式來接近它?

最終,我正在尋找的解決方案是在由來自另一個流的事件觸發的流中注入特定數量的項目。

回答

0

這個怎麼樣:

manager.leaders \ 
    .flat_map(lambda e: event_source 
        .start_with(...) 
        .take_until(manager.followers)) 

每次manager.leaders發出消息event_source將訂閱,從注入項目,直到manager.followers發出。