2017-08-07 54 views
0

我正在開發一個文件下載,我粗略地建設我流像這樣如何阻止一些流的flatmap

someobservable   ---------------------------- this stream can generate 1000 of                      
                 downloadable urls 
    .flatmap(urltodownload -> { 
      downloadStream(value); --------------------- this steam can pause resume and 
                 maybe cancel 
    }.observeOn(AndroidScheduler.mainthread()) 
    .subcribe(); 

我怎麼可以暫停或flatmap

+0

你的意思是停止下載任務或全部任務的一部分? –

+0

是停止flatmap中的一些下載流不是所有的任務 –

+0

檢出TakeUntil運算符。對於每個downloadStream,您可以TakeUntil取消可見項發射一個項目。 – masp

回答

0

我取消一些可觀察到的創建有一個不完美的解決方案。我使用CombineLastdispose任務'Disposable通過塞子Observable。見代碼:

package xdean.stackoverflow.rx; 

import io.reactivex.Observable; 
import io.reactivex.Single; 
import io.reactivex.disposables.Disposable; 
import io.reactivex.schedulers.Schedulers; 
import io.reactivex.subjects.BehaviorSubject; 
import io.reactivex.subjects.Subject; 

import java.util.List; 
import java.util.Random; 
import java.util.function.IntPredicate; 

import xdean.jex.extra.Pair; 

public class Q45540738 { 
    static Random random = new Random(); 

    public static void main(String[] args) throws InterruptedException { 
    Single<List<Pair<Integer, Disposable>>> tasks = Observable.range(0, 50) 
     .map(i -> Pair.of(i, download(i).subscribe())) 
     .toList(); 
    Subject<IntPredicate> stopers = BehaviorSubject.createDefault(i -> true); 
    Observable.combineLatest(tasks.toObservable(), stopers, (l, s) -> { 
     l.removeIf(p -> { 
     if (s.test(p.getLeft()) == false) { 
      p.getRight().dispose(); 
     } 
     return p.getRight().isDisposed(); 
     }); 
     return l; 
    }).subscribe(); 
    Thread.sleep(2000); 
    stopers.onNext(i -> i % 2 == 0); 
    Thread.sleep(10000); 
    } 

    static Observable<Integer> download(int id) { 
    return Observable.just(random.nextInt(1000) + 500) 
     .observeOn(Schedulers.computation()) 
     .doOnNext(t -> Thread.sleep(t)) 
     .doOnDispose(() -> System.err.printf("%d task stoped on %s\n", id, Thread.currentThread())) 
     .doOnNext(t -> System.out.printf("%d task done on %s\n", id, Thread.currentThread())); 
    } 
} 

輸出:

0 task done on Thread[RxComputationThreadPool-1,5,main] 
5 task done on Thread[RxComputationThreadPool-6,5,main] 
6 task done on Thread[RxComputationThreadPool-7,5,main] 
2 task done on Thread[RxComputationThreadPool-3,5,main] 
1 task done on Thread[RxComputationThreadPool-2,5,main] 
3 task done on Thread[RxComputationThreadPool-4,5,main] 
4 task done on Thread[RxComputationThreadPool-5,5,main] 
7 task done on Thread[RxComputationThreadPool-8,5,main] 
8 task done on Thread[RxComputationThreadPool-1,5,main] 
9 task stoped on Thread[main,5,main] 
11 task stoped on Thread[main,5,main] 
13 task stoped on Thread[main,5,main] 
14 task done on Thread[RxComputationThreadPool-7,5,main] 
15 task stoped on Thread[main,5,main] 
17 task stoped on Thread[main,5,main] 
19 task stoped on Thread[main,5,main] 
21 task stoped on Thread[main,5,main] 
23 task stoped on Thread[main,5,main] 
25 task stoped on Thread[main,5,main] 
27 task stoped on Thread[main,5,main] 
29 task stoped on Thread[main,5,main] 
31 task stoped on Thread[main,5,main] 
33 task stoped on Thread[main,5,main] 
35 task stoped on Thread[main,5,main] 
37 task stoped on Thread[main,5,main] 
39 task stoped on Thread[main,5,main] 
41 task stoped on Thread[main,5,main] 
43 task stoped on Thread[main,5,main] 
45 task stoped on Thread[main,5,main] 
47 task stoped on Thread[main,5,main] 
49 task stoped on Thread[main,5,main] 
10 task done on Thread[RxComputationThreadPool-3,5,main] 
16 task done on Thread[RxComputationThreadPool-1,5,main] 
12 task done on Thread[RxComputationThreadPool-5,5,main] 
22 task done on Thread[RxComputationThreadPool-7,5,main] 
24 task done on Thread[RxComputationThreadPool-1,5,main] 
18 task done on Thread[RxComputationThreadPool-3,5,main] 
30 task done on Thread[RxComputationThreadPool-7,5,main] 
20 task done on Thread[RxComputationThreadPool-5,5,main] 
32 task done on Thread[RxComputationThreadPool-1,5,main] 
26 task done on Thread[RxComputationThreadPool-3,5,main] 
40 task done on Thread[RxComputationThreadPool-1,5,main] 
38 task done on Thread[RxComputationThreadPool-7,5,main] 
34 task done on Thread[RxComputationThreadPool-3,5,main] 
28 task done on Thread[RxComputationThreadPool-5,5,main] 
48 task done on Thread[RxComputationThreadPool-1,5,main] 
42 task done on Thread[RxComputationThreadPool-3,5,main] 
46 task done on Thread[RxComputationThreadPool-7,5,main] 
36 task done on Thread[RxComputationThreadPool-5,5,main] 
44 task done on Thread[RxComputationThreadPool-5,5,main]