我正在使用RxSwift來包裝移動應用程序的服務器同步過程。我有一個Observable<RemoteEvent>
,它包裝一個websocket連接併發出每個收到的消息Event
。同樣,我有一個包含API同步過程的Observable<SynchronizationResult>
。一旦我的應用程序打開WebSocket連接,服務器將發送一個hello
消息。收到該消息後,我想開始同步過程並緩存所有事件,直到同步完成。這是我掙扎的地方。目前,我有:緩衝區可觀察直到另一個Observable成功完成
self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in
guard event.type == "hello" else {
return (state.0?.concat(Observable.just(event)), state.1 + [event])
}
// This is the sync operation
return (
self.synchronizationService
.synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true)
.toArray()
.flatMap { results -> Observable<RemoteEvent> in
(state.1 + [event]).toObservable()
},
[]
)
}
.flatMapLatest { $0.0 ?? Observable.empty() }
儘管這是非常醜陋的,它也有顯著的錯誤:在同步Observable
任何傳入事件結果被重新訂閱,然後重新啓動整個同步過程。我確定必須有更好的方法來做到這一點。
將 「你好」 是由WebSocket的所發出的第一個事件?如果沒有,「hello」之前的任何事件是否會被緩衝,或者是否應該在「hello」/ sync-start之後緩衝任何事件?這個用例看起來很奇怪,但我會忽略它,並將其作爲面值,因爲我認爲你已經將你的問題改爲實際用例的簡單版本。 – solidcell
@solidcell假設「hello」是第一個事件是安全的。在此之前的任何事情(無論在實踐中)都可以忽略。你是對的,這既是一個奇怪的用例,也是一個簡化版本的問題。但最終,對於應用程序來說,這是防止競爭條件最簡單的方法(例如,如果同步發生,然後websocket連接,則可能會錯過事件;如果websocket連接並同步而沒有緩衝區,則事件可能是在支持數據可用之前進行處理)。這個解決方案大多是暫時的,直到解決其他問題。 –
這是怎麼回事?:「如果同步發生,然後websocket連接」。如果websocket需要連接以便出現「hello」消息,以便我們嘗試啓動同步,那麼在websocket連接之前可能會發生同步? – solidcell