0
給定一個Service對象,我想確保每個對該服務的函數調用都不會產生副作用。在我的情況下,無論函數A在做什麼,除非調度器可用,否則什麼都不會在函數B中執行。concatMap/flatMap應該立即在同一個調度程序上運行
這裏是這個樣子:
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.just(input)
.do(onNext: { (str) in
print ("Job A: \(str)")
})
.concatMap { input -> Observable<String> in
return Observable.just("Job AA: \(input)")
.delay(2, scheduler: self.scheduler)
.do(onNext: { (str) in
print (str)
})
}
.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.just(input)
.do(onNext: { (str) in
print ("Job B: \(str)")
})
.delay(1, scheduler: scheduler)
.concatMap { input -> Observable<String> in
return Observable.just("Job BB: \(input)")
.do(onNext: { (str) in
print (str)
})
}
.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["1","2","3"])
.concatMap { service.handleJobA(input: $0) }
.subscribe(onNext:{
print($0 + " √")
})
_ = Observable.from(["1","2","3"])
.concatMap { service.handleJobB(input: $0) }
.subscribe(onNext:{
print($0 + " √")
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
目前,輸出爲:
Job A: 1
Job B: 1
Job BB: 1
Job BB: 1 √
Job B: 2
Job AA: 1
Job AA: 1 √
Job A: 2
Job BB: 2
Job BB: 2 √
Job B: 3
Job BB: 3
Job BB: 3 √
Job AA: 2
Job AA: 2 √
Job A: 3
Job AA: 3
Job AA: 3 √
然而,這說明根本問題。內部延遲(可能發生在任何事情上,實際上......網絡,處理)都會導致可觀察的處理失去「順序」。
我想是這樣的:
Job A: 1
Job AA: 1
Job AA: 1 √
Job B: 1
Job BB: 1
Job BB: 1 √
Job B: 2
Job BB: 2
Job BB: 2 √
Job B: 3
Job BB: 3
Job BB: 3 √
Job A: 2
Job AA: 2
Job AA: 2 √
Job A: 3
Job AA: 3
Job AA: 3 √
這意味着,一旦功能已開始處理任務,沒有其他人獲得的訪問,除非它完成。
我以前收到很好的answer。它並不完全適用,因爲flatMap/concatMap(?)似乎都不喜歡調度程序。
我的理論是,concatMap調用確實做了正確的工作,但是然後將子序列省略調度到調度器隊列的末尾,而我希望它在前面,接下來要處理。