我使用RX實體時,有以下方案:如何檢查所有需要的訂閱者是否完成了他們的工作?
Observerable1
|
------------------
| | |
S1(*) S2(*) S3 (onCompleted() starts another observable)
|
Observable2
|
S4(*)
我需要知道什麼時候所有*
用戶完成他們的工作(S1,S2,S4
)。因爲它們可以在不同的線程中執行,所以我需要一些同步機制,或者有一些來自rx-java
的開箱即用解決方案?
一些示例代碼來說明當前的設計:
@Component
public class BatchReader {
@Autowired List<Subscriber<List<Integer>>> subscribers;
public void start() {
ConnectableObservable<List<Integer>> observable1 = createObservable();
subscribers.forEach(observable1::subscribe);
observable1.connect();
}
private ConnectableObservable<List<Integer>> createObservable() {
return Observable.create((Subscriber<? super List<Integer>> subscriber) -> {
try {
subscriber.onStart();
while (someCondition) {
List<Integer> numbers = ...;
subscriber.onNext(numbers);
}
subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}).observeOn(Schedulers.newThread()).publish();
}
}
在S3我具有以下邏輯:
@Component
public class S3 extends Subscriber<List<Integer>> {
@Autowired AnotherBatchReader anotherBatchReader;
....
@Override
public void onCompleted() {
anotherBatchReader.start();
}
...
}
和S4所預訂在AnotherBatchReader:
@Component
public class AnotherBatchReader {
@Autowired S4<List<Foo>> subscriber4;
public void start() {
Observable<List<Foo>> observable2 = createObservable();
observable2.subscribe(subscriber4);
}
private Observable<List<Foo>> createObservable() {
return Observable.create(subscriber -> {
try {
subscriber.onStart();
while (someConditionBar) {
List<Foo> foo = ...;
subscriber.onNext(foo);
}
subscriber.onCompleted();
}
} catch (RuntimeException ex) {
subscriber.onError(ex);
}
});
}
}
那麼,有一個當我感興趣的所有用戶完成他們的工作時,要如何正確通知? rx是否支持它?或者也許有更好的設計來支持它?
編輯: 我有單獨的用戶,因爲每個人都有不同的行爲。最後,用戶*
(S1,S2,S3)將把他們的數據寫入xml文件。但
- S1接收數據
onNext()
,做一些工作,並直接向文件寫入結果 - S2接收數據
onNext()
,做一些工作,聚集在現場的結果,然後用onCompleted
- S3將它寫入在
onNext
接收數據,做了一些工作,DB寫入結果和onCompleted
被稱爲後開始的觀察到它開始從數據庫獲取數據,並把它推到S4 - S4在
onNext()
接收數據,做了一些工作,並寫入文件
爲什麼我需要在S3中的數據寫入到數據庫的原因是由於從接收到的數據在onNext()
生成的結果必須是唯一的,但我是從Observable1
獲得批量的數據,我可以」沒有保證這個獨特性,所以DB照顧它。
當然,在S3
我不能像S2
那樣做(將所有結果累積在內存中),因爲S3中存在的結果的乘積與S2相比是顯着的。
這看起來waaaay太複雜 - 你有什麼用呢?爲什麼你有分開的用戶? –
@TassosBassoukos我編輯問題回答你的問題 – marknorkin
感謝您的澄清。 –