0
我學習rxSwift,並且我想爲c longpolling服務器與模仿永久連接的這個服務的交互做服務。我寫了,但在我看來,這並不是說這個決定可以做得更好嗎?是否有可能以某種方式重複Observable,而不考慮錯誤,並取決於longpoll服務器的響應。LongPolling與rxSwift
任何人都可以分享解決方案嗎?或幫助建議?如何組織好?我希望看到一個更好的解決方案,因爲只有開始學習rxswift
class LongPollingService {
public var messageReciver: PublishSubject<EventProtocol> = PublishSubject<EventProtocol>()
private let transport = DefaultTransport()
private let disposeBag = DisposeBag()
private var currentRequestInfo = Variable<LongpollingServerInfo?>(nil)
private var currentRequestDisposable: Disposable?
private var currentLongpollingConnection: Disposable? // Subsribee for request server info
private var eventListener : Disposable?
private var currentReqursiveConnection: Disposable? // Subscriber for event listener from longpoll server
func startObservableEvents() {
getServerConnection()
subscribeServerInfo()
//testing listen events
eventListener = messageReciver.showMessagesInDebugMode().subscribe()
eventListener?.addDisposableTo(disposeBag)
}
func disconnect() {
currentRequestDisposable?.dispose()
currentLongpollingConnection?.dispose()
currentReqursiveConnection?.dispose()
}
private func subscribeServerInfo() {
currentLongpollingConnection = currentRequestInfo
.asObservable()
.filter({$0 != nil})
.subscribe(onNext: { [weak self] (info) in
guard let sSelf = self else { return }
sSelf.subscribeToEvents(timeStamp: info!.ts)
})
currentLongpollingConnection?.addDisposableTo(disposeBag)
}
private func subscribeToEvents(timeStamp: TimeInterval) {
if let serverInfo = currentRequestInfo.value {
currentReqursiveConnection?.dispose()
currentReqursiveConnection = getEventsFromLongpollServer(serverInfo: serverInfo, with: timeStamp)
.flatMap(parseUpdates)
.flatMap(reciveEvents)
.showErrorsSwiftMessagesInDebugMode()
.subscribe(onNext: { [weak self] updates in
guard let sSelf = self else { return }
sSelf.subscribeToEvents(timeStamp: updates)
},
onError: { [weak self] error in
guard let sSelf = self else { return }
if let error = error as? LongPollError {
switch error {
case .olderHistory(let ts): sSelf.subscribeToEvents(timeStamp: ts)
default: sSelf.getServerConnection()
}
}
})
currentReqursiveConnection?.addDisposableTo(disposeBag)
}
}
private func getServerConnection() {
//get longpolling server info for connection.
currentRequestDisposable = getLongpollServerInfo()
.subscribe(onNext: {[weak self] info in
guard let sSelf = self else { return }
sSelf.currentRequestInfo.value = info
})
currentRequestDisposable?.addDisposableTo(disposeBag)
}
private func parseUpdates(json: Any) throws -> Observable<LongPollingUpdates> {
let response = try Mapper<LongPollingUpdates>().map(JSONObject: json)
return .just(response)
}
private func reciveEvents(updates:LongPollingUpdates) throws -> Observable<TimeInterval> {
if let errors = updates.failed {
throw parseErrors(errors: errors)
}
if let events = updates.updates {
parseUpdates(updates: events)
}
return Observable.just(updates.timeStamp!)
}
private func parseUpdates(updates: [[Any]]) {
updates.forEach { (array) in
let firstElementInUpdate = array.first
if let update = firstElementInUpdate as? Int {
switch update {
case 1: break
case 2: break
case 3: break
case 4: messageReciver.onNext(NewMessage(array: array))
default: break
}
}
}
}
private func parseErrors(errors: [String: Any]) -> LongPollError {
if let error = errors["failed"] as? Int {
switch error {
case 1:
guard let ts = errors["ts"] as? TimeInterval else { return .unkownError }
return .olderHistory(ts: ts)
case 2: return .needNewkey
case 3: return .needCaseAndTs
case 4: return .unkownVersion
default:
return .unkownError
}
}
return .unkownError
}
private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo, with ts: TimeInterval) -> Observable<Any> {
let url = buildLongPollingServerRoute(from: serverInfo, with: ts)
let request = buldLongPollRequst(route: url)
let requestConvert = try? URLEncoding.default.encode(request!, with: nil)
return transport.makeRequest(request: requestConvert!)
}
private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo) -> Observable<Any> {
let url = buildLongPollingServerRoute(from: serverInfo)
let request = buldLongPollRequst(route: url)
let requestConvert = try? URLEncoding.default.encode(request!, with: nil)
return transport.makeRequest(request: requestConvert!)
}
private func getLongpollServerInfo() -> Observable<LongpollingServerInfo> {
let request = MessageRouter.getLongpollServer(useSsl: false, needPts: false)
return transport.makeModel(request: request)
}
}
謝謝,但間隔並不適合我的目的,我需要得到的事件不是特定的週期性,並且自服務器最後一次事件發生時起。 subscribeToEvents(timeStamp: - timestamp這是參數作爲服務器上發生的最後一個事件。 – Zept