我取消一些可觀察到的創建有一個不完美的解決方案。我使用CombineLast
至dispose
任務'Disposable
通過塞子Observable
。見代碼:
package xdean.stackoverflow.rx;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
import java.util.List;
import java.util.Random;
import java.util.function.IntPredicate;
import xdean.jex.extra.Pair;
public class Q45540738 {
static Random random = new Random();
public static void main(String[] args) throws InterruptedException {
Single<List<Pair<Integer, Disposable>>> tasks = Observable.range(0, 50)
.map(i -> Pair.of(i, download(i).subscribe()))
.toList();
Subject<IntPredicate> stopers = BehaviorSubject.createDefault(i -> true);
Observable.combineLatest(tasks.toObservable(), stopers, (l, s) -> {
l.removeIf(p -> {
if (s.test(p.getLeft()) == false) {
p.getRight().dispose();
}
return p.getRight().isDisposed();
});
return l;
}).subscribe();
Thread.sleep(2000);
stopers.onNext(i -> i % 2 == 0);
Thread.sleep(10000);
}
static Observable<Integer> download(int id) {
return Observable.just(random.nextInt(1000) + 500)
.observeOn(Schedulers.computation())
.doOnNext(t -> Thread.sleep(t))
.doOnDispose(() -> System.err.printf("%d task stoped on %s\n", id, Thread.currentThread()))
.doOnNext(t -> System.out.printf("%d task done on %s\n", id, Thread.currentThread()));
}
}
輸出:
0 task done on Thread[RxComputationThreadPool-1,5,main]
5 task done on Thread[RxComputationThreadPool-6,5,main]
6 task done on Thread[RxComputationThreadPool-7,5,main]
2 task done on Thread[RxComputationThreadPool-3,5,main]
1 task done on Thread[RxComputationThreadPool-2,5,main]
3 task done on Thread[RxComputationThreadPool-4,5,main]
4 task done on Thread[RxComputationThreadPool-5,5,main]
7 task done on Thread[RxComputationThreadPool-8,5,main]
8 task done on Thread[RxComputationThreadPool-1,5,main]
9 task stoped on Thread[main,5,main]
11 task stoped on Thread[main,5,main]
13 task stoped on Thread[main,5,main]
14 task done on Thread[RxComputationThreadPool-7,5,main]
15 task stoped on Thread[main,5,main]
17 task stoped on Thread[main,5,main]
19 task stoped on Thread[main,5,main]
21 task stoped on Thread[main,5,main]
23 task stoped on Thread[main,5,main]
25 task stoped on Thread[main,5,main]
27 task stoped on Thread[main,5,main]
29 task stoped on Thread[main,5,main]
31 task stoped on Thread[main,5,main]
33 task stoped on Thread[main,5,main]
35 task stoped on Thread[main,5,main]
37 task stoped on Thread[main,5,main]
39 task stoped on Thread[main,5,main]
41 task stoped on Thread[main,5,main]
43 task stoped on Thread[main,5,main]
45 task stoped on Thread[main,5,main]
47 task stoped on Thread[main,5,main]
49 task stoped on Thread[main,5,main]
10 task done on Thread[RxComputationThreadPool-3,5,main]
16 task done on Thread[RxComputationThreadPool-1,5,main]
12 task done on Thread[RxComputationThreadPool-5,5,main]
22 task done on Thread[RxComputationThreadPool-7,5,main]
24 task done on Thread[RxComputationThreadPool-1,5,main]
18 task done on Thread[RxComputationThreadPool-3,5,main]
30 task done on Thread[RxComputationThreadPool-7,5,main]
20 task done on Thread[RxComputationThreadPool-5,5,main]
32 task done on Thread[RxComputationThreadPool-1,5,main]
26 task done on Thread[RxComputationThreadPool-3,5,main]
40 task done on Thread[RxComputationThreadPool-1,5,main]
38 task done on Thread[RxComputationThreadPool-7,5,main]
34 task done on Thread[RxComputationThreadPool-3,5,main]
28 task done on Thread[RxComputationThreadPool-5,5,main]
48 task done on Thread[RxComputationThreadPool-1,5,main]
42 task done on Thread[RxComputationThreadPool-3,5,main]
46 task done on Thread[RxComputationThreadPool-7,5,main]
36 task done on Thread[RxComputationThreadPool-5,5,main]
44 task done on Thread[RxComputationThreadPool-5,5,main]
你的意思是停止下載任務或全部任務的一部分? –
是停止flatmap中的一些下載流不是所有的任務 –
檢出TakeUntil運算符。對於每個downloadStream,您可以TakeUntil取消可見項發射一個項目。 – masp