2016-09-13 20 views
0

我使用RX實體時,有以下方案:如何檢查所有需要的訂閱者是否完成了他們的工作?

Observerable1 
     | 
------------------ 
|  |   | 
S1(*) S2(*)  S3 (onCompleted() starts another observable) 
        | 
       Observable2 
        | 
       S4(*) 

我需要知道什麼時候所有*用戶完成他們的工作(S1,S2,S4)。因爲它們可以在不同的線程中執行,所以我需要一些同步機制,或者有一些來自rx-java的開箱即用解決方案?

一些示例代碼來說明當前的設計:

@Component 
public class BatchReader { 
    @Autowired List<Subscriber<List<Integer>>> subscribers; 

    public void start() { 
     ConnectableObservable<List<Integer>> observable1 = createObservable(); 
     subscribers.forEach(observable1::subscribe); 
     observable1.connect(); 
    } 

    private ConnectableObservable<List<Integer>> createObservable() { 
     return Observable.create((Subscriber<? super List<Integer>> subscriber) -> { 
      try {  
       subscriber.onStart(); 

       while (someCondition) { 
        List<Integer> numbers = ...; 
        subscriber.onNext(numbers); 
       } 

       subscriber.onCompleted(); 
      } catch (Exception ex) { 
       subscriber.onError(ex); 
      } 
     }).observeOn(Schedulers.newThread()).publish(); 
    } 
} 

在S3我具有以下邏輯:

@Component 
public class S3 extends Subscriber<List<Integer>> { 
    @Autowired AnotherBatchReader anotherBatchReader; 
    .... 
    @Override 
    public void onCompleted() { 
     anotherBatchReader.start(); 
    } 
    ... 
} 

和S4所預訂在AnotherBatchReader:

@Component 
public class AnotherBatchReader { 
    @Autowired S4<List<Foo>> subscriber4; 

    public void start() { 
     Observable<List<Foo>> observable2 = createObservable(); 
     observable2.subscribe(subscriber4); 
    } 

    private Observable<List<Foo>> createObservable() { 
     return Observable.create(subscriber -> { 
      try { 
        subscriber.onStart(); 
        while (someConditionBar) { 
         List<Foo> foo = ...; 
         subscriber.onNext(foo); 
        } 
        subscriber.onCompleted(); 
       } 
      } catch (RuntimeException ex) { 
       subscriber.onError(ex); 
      } 
     }); 
    } 
} 

那麼,有一個當我感興趣的所有用戶完成他們的工作時,要如何正確通知? rx是否支持它?或者也許有更好的設計來支持它?

編輯: 我有單獨的用戶,因爲每個人都有不同的行爲。最後,用戶*(S1,S2,S3)將把他們的數據寫入xml文件。但

  • S1接收數據onNext(),做一些工作,並直接向文件寫入結果
  • S2接收數據onNext(),做一些工作,聚集在現場的結果,然後用onCompleted
  • S3將它寫入在onNext接收數據,做了一些工作,DB寫入結果和onCompleted被稱爲後開始的觀察到它開始從數據庫獲取數據,並把它推到S4
  • S4在onNext()接收數據,做了一些工作,並寫入文件

爲什麼我需要在S3中的數據寫入到數據庫的原因是由於從接收到的數據在onNext()生成的結果必須是唯一的,但我是從Observable1獲得批量的數據,我可以」沒有保證這個獨特性,所以DB照顧它。

當然,在S3我不能像S2那樣做(將所有結果累積在內存中),因爲S3中存在的結果的乘積與S2相比是顯着的。

+0

這看起來waaaay太複雜 - 你有什麼用呢?爲什麼你有分開的用戶? –

+0

@TassosBassoukos我編輯問題回答你的問題 – marknorkin

+0

感謝您的澄清。 –

回答

1

感謝您的澄清。在我看來,現有運營商的審慎應用將使您的代碼最小化。現在,我不知道所有的細節,但你在做什麼感覺了很多這樣的:

Observable<T> observable1 = ... 
     .share(); 

Observable<?> S1 = S1(observable1); 
Observable<?> S2 = S2(observable1); 
Observable<?> S3 = S3(observable1); 
Observable<?> S4 = defer(() -> readFromDatabase()).compose(S4::generate); 

Observable.merge(S1,S2,S3.ignoreElements().switchIfEmpty(S4)) 
    .ignoreElements() 
    .onComplete(...) 
    .subscribe(); 

當然,細節將取決於是否原裝觀察到的是冷是熱是不同的, S[1-4]的詳細信息。

此外,不要試圖自己駕駛所有Subscribers,讓框架爲您做,並且您將會得到更多 - f.e.:

S4 = Observable.create(SyncOnSubscribe.generateStateless(
     observer -> observer.onNext(<get List<Foo>>) 
    )) 
    .takeWhile(list -> someConditionBar); 

編輯:這是XY問題的情況下 - 我們都穿過了這...

相關問題