2016-08-27 69 views
1

我正在使用RxSwift來包裝移動應用程序的服務器同步過程。我有一個Observable<RemoteEvent>,它包裝一個websocket連接併發出每個收到的消息Event。同樣,我有一個包含API同步過程的Observable<SynchronizationResult>。一旦我的應用程序打開WebSocket連接,服務器將發送一個hello消息。收到該消息後,我想開始同步過程並緩存所有事件,直到同步完成。這是我掙扎的地方。目前,我有:緩衝區可觀察直到另一個Observable成功完成

self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in 
    guard event.type == "hello" else { 
    return (state.0?.concat(Observable.just(event)), state.1 + [event]) 
    } 

    // This is the sync operation 
    return (
    self.synchronizationService 
     .synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true) 
     .toArray() 
     .flatMap { results -> Observable<RemoteEvent> in 
     (state.1 + [event]).toObservable() 
     }, 
    [] 
) 
} 
.flatMapLatest { $0.0 ?? Observable.empty() } 

儘管這是非常醜陋的,它也有顯著的錯誤:在同步Observable任何傳入事件結果被重新訂閱,然後重新啓動整個同步過程。我確定必須有更好的方法來做到這一點。

+0

將 「你好」 是由WebSocket的所發出的第一個事件?如果沒有,「hello」之前的任何事件是否會被緩衝,或者是否應該在「hello」/ sync-start之後緩衝任何事件?這個用例看起來很奇怪,但我會忽略它,並將其作爲面值,因爲我認爲你已經將你的問題改爲實際用例的簡單版本。 – solidcell

+0

@solidcell假設「hello」是第一個事件是安全的。在此之前的任何事情(無論在實踐中)都可以忽略。你是對的,這既是一個奇怪的用例,也是一個簡化版本的問題。但最終,對於應用程序來說,這是防止競爭條件最簡單的方法(例如,如果同步發生,然後websocket連接,則可能會錯過事件;如果websocket連接並同步而沒有緩衝區,則事件可能是在支持數據可用之前進行處理)。這個解決方案大多是暫時的,直到解決其他問題。 –

+0

這是怎麼回事?:「如果同步發生,然後websocket連接」。如果websocket需要連接以便出現「hello」消息,以便我們嘗試啓動同步,那麼在websocket連接之前可能會發生同步? – solidcell

回答

1

這裏是你如何能得到你正在尋找的功能:

// this is a stub for the purpose of the example 
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance) 
let websocketEvents = interval 
    .map { i -> String in 
     if i == 1 { 
      return "hello" 
     } else { 
      return String(i) 
     } 
    } 
    .replayAll() 

websocketEvents.connect() 

func performSync() -> Observable<Void> { 
    return Observable<Void>.create { o in 
     print("starting sync") 
     // actually start sync with server 
     // .... 
     delay(2.0) { 
      print("sync finished") 
      o.onNext(()) 
     } 
     return NopDisposable.instance 
    } 
} 

// websocket events as they come, regardless of sync status 
websocketEvents 
    .subscribeNext { e in 
     print("websocket event received: \(e)") 
    } 

// all websocket events, buffered and only emitted post-sync 
websocketEvents 
    .filter { $0 == "hello" } 
    .flatMapLatest { _ in performSync() } 
    .flatMapLatest { _ in websocketEvents } 
    .subscribeNext { e in 
     print("websocket event post sync: \(e)") 
    } 

這將輸出:

的WebSocket事件好評:0
的WebSocket事件收到:你好
開始同步
收到的websocket事件:2
收到的websocket事件:3
同步完成
WebSocket的事件後同步:0
WebSocket的事件後同步:你好
WebSocket的事件後同步:2
WebSocket的事件後同步:3
的WebSocket事件接收:4
WebSocket的事件後同步:4收到
的WebSocket事件:5
的WebSocket事件後同步:5

+0

這真棒!一個問題 - 我正確地認爲'replayAll'會繼續保持從流中發出的所有項目,即使它們已經被回放了嗎?這裏有關於內存使用的擔憂嗎?我當前(非常黑客和破解)的方法確實對緩衝區施加了最大大小限制,並且如果在緩衝區之前無法完成同步(出於簡潔目的,未在問題中列出),錯誤將從流中排除。 –

+0

是的,這絕對是值得關注的東西。有'replay(bufferCount:Int)'這可能會限制緩衝區大小。但是,如果可能的話,您應該考慮針對同步情況採用不同的方法,而不是緩衝元素。 – solidcell

+0

感謝您的澄清。至於同步,我玩過幾種不同的方法。我最大的粉絲之一就是讓服務器在最後x個時間內保存事件緩存。然後在連接上它只會丟棄你錯過的所有東西,所以客戶沒有額外的工作要做。問題在於(取決於應用程序和事件類型),這會比創建API調用導致更多的數據使用。 –