2017-09-28 37 views
0

給定一個Service對象,我想確保每個對該服務的函數調用都不會產生副作用。在我的情況下,無論函數A在做什麼,除非調度器可用,否則什麼都不會在函數B中執行。concatMap/flatMap應該立即在同一個調度程序上運行

這裏是這個樣子:

class Service { 

    func handleJobA(input: String) -> Observable<String> { 
     return Observable.just(input) 
      .do(onNext: { (str) in 
       print ("Job A: \(str)") 
      }) 
      .concatMap { input -> Observable<String> in 
       return Observable.just("Job AA: \(input)") 
        .delay(2, scheduler: self.scheduler) 
        .do(onNext: { (str) in 
         print (str) 
        }) 
      } 

      .subscribeOn(scheduler) 
    } 

    func handleJobB(input: String) -> Observable<String> { 
     return Observable.just(input) 
      .do(onNext: { (str) in 
       print ("Job B: \(str)") 
      }) 
      .delay(1, scheduler: scheduler) 
      .concatMap { input -> Observable<String> in 
       return Observable.just("Job BB: \(input)") 
        .do(onNext: { (str) in 
         print (str) 
        }) 
      } 

      .subscribeOn(scheduler) 
    } 


    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service") 
} 


let service = Service() 

_ = Observable.from(["1","2","3"]) 
    .concatMap { service.handleJobA(input: $0) } 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

_ = Observable.from(["1","2","3"]) 
    .concatMap { service.handleJobB(input: $0) } 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

import PlaygroundSupport 

PlaygroundPage.current.needsIndefiniteExecution = true 

目前,輸出爲:

Job A: 1 
Job B: 1 
Job BB: 1 
Job BB: 1 √ 
Job B: 2 
Job AA: 1 
Job AA: 1 √ 
Job A: 2 
Job BB: 2 
Job BB: 2 √ 
Job B: 3 
Job BB: 3 
Job BB: 3 √ 
Job AA: 2 
Job AA: 2 √ 
Job A: 3 
Job AA: 3 
Job AA: 3 √ 

然而,這說明根本問題。內部延遲(可能發生在任何事情上,實際上......網絡,處理)都會導致可觀察的處理失去「順序」。

我想是這樣的:

Job A: 1 
Job AA: 1 
Job AA: 1 √ 
Job B: 1 
Job BB: 1 
Job BB: 1 √ 
Job B: 2 
Job BB: 2 
Job BB: 2 √ 
Job B: 3 
Job BB: 3 
Job BB: 3 √ 
Job A: 2 
Job AA: 2 
Job AA: 2 √ 
Job A: 3 
Job AA: 3 
Job AA: 3 √ 

這意味着,一旦功能已開始處理任務,沒有其他人獲得的訪問,除非它完成。

我以前收到很好的answer。它並不完全適用,因爲flatMap/concatMap(?)似乎都不喜歡調度程序。

我的理論是,concatMap調用確實做了正確的工作,但是然後將子序列省略調度到調度器隊列的末尾,而我希望它在前面,接下來要處理。

回答

1

我無法解釋調度行爲,但我可以做一個小建議

...一旦功能已開始處理任務,除非是做旁若無人得到的 訪問。 ..

您可以通過concatMap通過所有handleJob電話來獲得您所需要的行爲:

Observable 
    .from([1,2,3,4,5,6]) 
    .flatMap({ (value) -> Observable<String> in 
     switch value % 2 == 0 { 
     case true: 
      return service.handleJobA(input: "\(value)") 
     case false: 
      return service.handleJobB(input: "\(value)") 
     } 
    }) 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

服務等級示例:

private class Service { 

    private lazy var result = PublishSubject<(index: Int, result: String)>() 
    private lazy var publish = PublishSubject<(index: Int, input: String, transformation: (String) -> String)>() 
    private lazy var index: Int = 0 
    private lazy var disposeBag = DisposeBag() 

    init() { 
     publish 
      .asObservable() 
      .concatMap({ (index, input, transformation) -> Observable<(index: Int, result: String)> in 
       let dueTime = RxTimeInterval(arc4random_uniform(3) + 1) 
       return Observable 
        .just((index: index, result: transformation(input))) 
        .delay(dueTime, scheduler: self.scheduler) 
      }) 
      .bind(to: result) 
      .disposed(by: disposeBag) 
    } 

    func handleJobA(input: String) -> Observable<String> { 
     let transformation: (String) -> String = { string in 
      return "Job A: \(string)" 
     } 
     return handleJob(input: input, transformation: transformation) 
    } 

    func handleJobB(input: String) -> Observable<String> { 
     let transformation: (String) -> String = { string in 
      return "Job B: \(string)" 
     } 
     return handleJob(input: input, transformation: transformation) 
    } 

    func handleJob(input: String, transformation: @escaping (String) -> String) -> Observable<String> { 
     index += 1 
     defer { 
      publish.onNext((index, input, transformation)) 
     } 
     return result 
      .filter({ [expected = index] (index, result) -> Bool in 
       return expected == index 
      }) 
      .map({ $0.result }) 
      .take(1) 
      .shareReplayLatestWhileConnected() 
    } 

    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service") 
} 
相關問題