2016-09-29 71 views
1

我試圖用rx風格的消費者數量有限(例如2)來執行長時間的操作。RxJava的生產者 - 消費者

問題是如何確保只有兩位消費者同時執行其工作。

讓我們有一個用戶界面:

public interface Consumer{ 
    //Take a lot of time 
    Observable<Result> doJob(Task task); 

} 

和隊列類:

public class Queue { 
    public void enqueue(Task task){ 
     //TODO: enqueue task and do it with limited count of Consumers 
    } 
} 

如何組織任務隊列和消費的工作?

回答

2

使用調度程序。將同時工作限制爲兩項任務:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2)); 
events.flatMap(x -> 
    Observable.fromCallable(() -> process(x)) 
       .subscribeOn(scheduler)) 
    .subscribe(subscriber);