2016-11-01 122 views
0

我有以下代碼來修改我的源observable,以便它在應用程序狀態在ENABLED和DISABLED之間切換時斷開連接並重新連接。但是,如果我的sourceObservable本身有一個調用完成的情況,我懷疑它會立即重新連接,因爲這個變換器的重複。如果應用程序狀態仍然是ENABLED。repeatWhen基於觸發完成的事件

是否有一種解決這個問題的優雅方式,或許將takeUntil分組並重復,以便它們不會捕獲來自上面的任何完整事件?

N.B. applicationStatusObservable根據請求重複其最後一個值。

幫助是非常讚賞:)

/** 
* Transforms the source observable so that it defers initial subscription until the service becomes available, 
* unsubscribes when the service becomes unavailable, and resubscribes when the service becomes available again. 
*/ 
public class AvailabilityTransformer<T> implements Observable.Transformer<T, T> { 

    private final Observable<ApplicationStatus> applicationStatusObservable; 

    AvailabilityTransformer(final Observable<ApplicationStatus> applicationStatusObservable) { 
     this.applicationStatusObservable = applicationStatusObservable; 
    } 

    @Override 
    public Observable<T> call(final Observable<T> sourceObservable) { 
     final Observable<ApplicationStatus> applicationEnabledObservable = 
       applicationStatusObservable.filter(applicationStatus -> applicationStatus == ENABLED); 
     final Observable<ApplicationStatus> applicationDisabledObservable = 
       applicationStatusObservable.filter(applicationStatus -> applicationStatus != ENABLED); 
     return sourceObservable 
       .takeUntil(applicationDisabledObservable) // Unsubscribe whenever the application is disabled 
       .repeatWhen(repeatObservable -> repeatObservable.flatMap(repeat -> 
         applicationEnabledObservable.flatMap(applicationStatus -> just(repeat)))) // Resubscribe when enabled again 
       .delaySubscription(applicationEnabledObservable.first()); // Delay the initial subscription until the application is first enabled 
    } 
} 
+0

applicationStatusObservable是熱還是冷?那麼,你想用AvailabilityTransformer實現什麼?我不明白。 –

+0

applicationStatusObservable對於重播(1).refCount()是冷的 - 在任何情況下,它總是會返回當前的應用程序狀態,並且如果觀察者保持訂閱狀態,則會通知應用程序狀態的任何更改。 這個想法是,如果應用程序被禁用,我們取消源可觀察訂閱。當它再次啓用時,它將重新訂閱源觀察值。 –

+0

縱觀OperatorTakeUntil和OnSubscribeDelay等的實現我認爲它可能更適合將它寫成運營商而不是Transformer ..現在關閉。 –

回答

0

他,

我花了一些時間來了解你在做什麼。我希望我終於明白了。您有一個熱點可觀察的isApplicationAvailable,它會告知訂閱者該應用程序是啓用還是禁用(true/false)。你還有其他的可觀察者,他們正在做一些工作(可能是熱/冷),並希望他們只產生值,如果observable isApplicationAvailable返回true。如果對於某些應用程序而言,應用程序變爲禁用狀態,則可觀察對象不應生成任何值。

我使用RxJava2進行測試。

在這個例子中,如果observable isApplicationActive返回true,那麼stringObservable只會將值傳遞給訂閱者。

@Test 
public void name() throws Exception { 
    Subject<Boolean> isApplicationActive = BehaviorSubject.<Boolean>create() 
      .toSerialized(); 

    Observable<Boolean> isApplicationActiveObservable = isApplicationActive 
      .hide() 
      .doOnNext(s -> System.out.println("isApplicationActive: " + s)); 

    isApplicationActive.onNext(false); 

    Thread.sleep(1_000); 

    Observable<String> stringObservable = Observable.interval(1_000, TimeUnit.MILLISECONDS) 
      .map(Objects::toString) 
      .doOnNext(s -> System.out.println("NextIntervalValue")) 
      .compose(createSwitchMapCompose(isApplicationActiveObservable)); 

    stringObservable.subscribe(s -> { 
     System.out.println("stringObservable: " + s); 
    }); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(true); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(false); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(true); 

    Thread.sleep(6_000); 

    isApplicationActive.onNext(false); 

    Thread.sleep(20_000); 
} 

private ObservableTransformer<String, String> createSwitchMapCompose(Observable<Boolean> isApplicationActiveObservable) { 
    return upstream -> upstream.switchMap(s -> isApplicationActiveObservable.take(1).flatMap(aBoolean -> { 
       if (aBoolean) { 
        return Observable.just(s); 
       } 
       return Observable.empty(); 
      }) 
    ); 
} 
相關問題