2017-02-20 95 views
0

我學習rxSwift,並且我想爲c longpolling服務器與模仿永久連接的這個服務的交互做服務。我寫了,但在我看來,這並不是說這個決定可以做得更好嗎?是否有可能以某種方式重複Observable,而不考慮錯誤,並取決於longpoll服務器的響應。LongPolling與rxSwift

任何人都可以分享解決方案嗎?或幫助建議?如何組織好?我希望看到一個更好的解決方案,因爲只有開始學習rxswift

class LongPollingService { 

    public var messageReciver: PublishSubject<EventProtocol> = PublishSubject<EventProtocol>() 

    private let transport = DefaultTransport() 

    private let disposeBag = DisposeBag() 

    private var currentRequestInfo = Variable<LongpollingServerInfo?>(nil) 

    private var currentRequestDisposable: Disposable? 

    private var currentLongpollingConnection: Disposable? // Subsribee for request server info 

    private var eventListener : Disposable? 

    private var currentReqursiveConnection: Disposable? // Subscriber for event listener from longpoll server 

    func startObservableEvents() { 
     getServerConnection() 
     subscribeServerInfo() 
     //testing listen events 
     eventListener = messageReciver.showMessagesInDebugMode().subscribe() 
     eventListener?.addDisposableTo(disposeBag) 
    } 

    func disconnect() { 
     currentRequestDisposable?.dispose() 
     currentLongpollingConnection?.dispose() 
     currentReqursiveConnection?.dispose() 
    } 

    private func subscribeServerInfo() { 
     currentLongpollingConnection = currentRequestInfo 
      .asObservable() 
      .filter({$0 != nil}) 
      .subscribe(onNext: { [weak self] (info) in 
       guard let sSelf = self else { return } 
       sSelf.subscribeToEvents(timeStamp: info!.ts) 
      }) 
     currentLongpollingConnection?.addDisposableTo(disposeBag) 
    } 

    private func subscribeToEvents(timeStamp: TimeInterval) { 
     if let serverInfo = currentRequestInfo.value { 
      currentReqursiveConnection?.dispose() 
      currentReqursiveConnection = getEventsFromLongpollServer(serverInfo: serverInfo, with: timeStamp) 
       .flatMap(parseUpdates) 
       .flatMap(reciveEvents) 
       .showErrorsSwiftMessagesInDebugMode() 
       .subscribe(onNext: { [weak self] updates in 
        guard let sSelf = self else { return } 
        sSelf.subscribeToEvents(timeStamp: updates) 
       }, 
       onError: { [weak self] error in 
        guard let sSelf = self else { return } 
         if let error = error as? LongPollError { 
          switch error { 
          case .olderHistory(let ts): sSelf.subscribeToEvents(timeStamp: ts) 
          default: sSelf.getServerConnection() 
          } 
         } 
       }) 
      currentReqursiveConnection?.addDisposableTo(disposeBag) 
     } 
    } 

    private func getServerConnection() { 
     //get longpolling server info for connection. 
     currentRequestDisposable = getLongpollServerInfo() 
      .subscribe(onNext: {[weak self] info in 
       guard let sSelf = self else { return } 
       sSelf.currentRequestInfo.value = info 
      }) 
     currentRequestDisposable?.addDisposableTo(disposeBag) 
    } 

    private func parseUpdates(json: Any) throws -> Observable<LongPollingUpdates> { 
     let response = try Mapper<LongPollingUpdates>().map(JSONObject: json) 
     return .just(response) 
    } 

    private func reciveEvents(updates:LongPollingUpdates) throws -> Observable<TimeInterval> { 
     if let errors = updates.failed { 
      throw parseErrors(errors: errors) 
     } 
     if let events = updates.updates { 
      parseUpdates(updates: events) 
     } 
     return Observable.just(updates.timeStamp!) 
    } 

    private func parseUpdates(updates: [[Any]]) { 
     updates.forEach { (array) in 
      let firstElementInUpdate = array.first 
      if let update = firstElementInUpdate as? Int { 
       switch update { 
       case 1: break 
       case 2: break 
       case 3: break 
       case 4: messageReciver.onNext(NewMessage(array: array)) 
       default: break 
       } 
      } 
     } 
    } 

    private func parseErrors(errors: [String: Any]) -> LongPollError { 
     if let error = errors["failed"] as? Int { 
      switch error { 
      case 1: 
       guard let ts = errors["ts"] as? TimeInterval else { return .unkownError } 
       return .olderHistory(ts: ts) 
      case 2: return .needNewkey 
      case 3: return .needCaseAndTs 
      case 4: return .unkownVersion 
      default: 
       return .unkownError 
      } 
     } 
     return .unkownError 
    } 

    private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo, with ts: TimeInterval) -> Observable<Any> { 
     let url = buildLongPollingServerRoute(from: serverInfo, with: ts) 
     let request = buldLongPollRequst(route: url) 
     let requestConvert = try? URLEncoding.default.encode(request!, with: nil) 
     return transport.makeRequest(request: requestConvert!) 
    } 

    private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo) -> Observable<Any> { 
     let url = buildLongPollingServerRoute(from: serverInfo) 
     let request = buldLongPollRequst(route: url) 
     let requestConvert = try? URLEncoding.default.encode(request!, with: nil) 
     return transport.makeRequest(request: requestConvert!) 
    } 

    private func getLongpollServerInfo() -> Observable<LongpollingServerInfo> { 
     let request = MessageRouter.getLongpollServer(useSsl: false, needPts: false) 
     return transport.makeModel(request: request) 
    } 

} 

回答

1

因此,假如你有一個像功能:

func getData() -> Observable<Data> 

而且要長輪詢它在特定period,你可以做這樣的事情:

Observable<Int>.interval(period, scheduler: MainScheduler.instance) 
    .map { _ in return } 
    .flatMap(getData) 
    .subscribe(/* ... handle data ... */) 
    .disposed(by: disposeBag) 

您可以使用其他調度比MainScheduler如果是比較合適的。

現在如果你想也處理Error s表示getData可能會發出,你不希望這樣的必然退訂長輪詢,那麼你可以這樣做:

func handleError(error: Error) -> Observable<Data> { 
    return Observable.empty() 
} 

Observable<Int>.interval(period, scheduler: MainScheduler.instance) 
    .map { _ in return } 
    .flatMap { return getData.catchError(handleError) } 
    .subscribe(/* ... handle data ... */) 
    .disposed(by: disposeBag) 

您還可以分析錯誤handleError,然後決定是否要繼續發送一個空的Observable或通過發出另一個錯誤取消長查詢。

+0

謝謝,但間隔並不適合我的目的,我需要得到的事件不是特定的週期性,並且自服務器最後一次事件發生時起。 subscribeToEvents(timeStamp: - timestamp這是參數作爲服務器上發生的最後一個事件。 – Zept