2017-08-29 30 views
0

是什麼RxJava2發佈

ObservableTransformer { 
    Observable.merge(
     it.ofType(x).compose(transformerherex), 
     it.ofType(y).compose(transformerherey) 
    ) 
} 

ObservableTransformer { 
    it.publish{ shared -> 
     Observable.merge(
      shared.ofType(x).compose(transformerherex), 
      shared.ofType(y).compose(transformerherey) 
     ) 
    } 
} 

之間的差異,當我運行用本品2我的代碼,我得到了相同的結果。這裏發佈的是什麼?

回答

2

的區別在於頂部變壓器將訂閱上游兩次從下游單個訂閱,複製上游的任何副作用通常是不想要的東西:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3) 
     .doOnSubscribe(s -> System.out.println("Subscribed!")); 


mixedSource.compose(f -> 
    Observable.merge(
     f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), 
     f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) 
    ) 
) 
.subscribe(System.out::println); 

將打印

Subscribed! 
2 
3 
4 
Subscribed! 
A 
B 
C 

此處表示的副作用是打印輸出Subscribed!根據實際來源中的實際工作情況,這可能意味着發送兩次電子郵件,兩次檢索表格的行。通過這個特定的例子,你可以看到即使源值在它們的類型中是交錯的,輸出也會單獨包含它們。

相比之下,publish(Function)將爲每個終端用戶建立一個訂閱源,因此源端的任何副作用只發生一次。

mixedSource.publish(f -> 
    Observable.merge(
     f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), 
     f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) 
    ) 
) 
.subscribe(System.out::println); 

因爲源訂閱一次並且每個項目是多播到.ofType().compose()的兩個「臂」,其打印

Subscribed! 
A 
2 
B 
3 
C 
4 

+0

我一直使用這個來自傑克·沃頓的演講「管理狀態」,我不知道它到底是怎麼回事。這是如此簡潔明瞭,非常感謝你。 – Hohenhiem

1

publish運營商將您的Observable轉換爲Connectable Observable

讓我們看看Connectable Observable是什麼意思:假設你想訂閱一個可觀察的多個時間,並且想要爲每個訂戶提供相同的項目。您需要使用Connectable Observable

例子:

var period = TimeSpan.FromSeconds(1); 
var observable = Observable.Interval(period).Publish(); 
observable.Connect(); 
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); 
Thread.Sleep(period); 
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 

輸出:

first subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2 

在這種情況下,我們有足夠快認購的第一項正式發佈之前,但只有第一簽約上。第二次訂閱訂閱太晚而錯過了第一次發佈。

我們可以移動Connect()方法的調用,直到所有訂閱完成爲止。這樣,即使對Thread.Sleep的調用,我們也不會真正訂閱基礎,直到兩個訂閱都完成爲止。這將如下進行:

var period = TimeSpan.FromSeconds(1); 
var observable = Observable.Interval(period).Publish(); 
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); 
Thread.Sleep(period); 
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 
observable.Connect(); 

輸出:

first subscription : 0 
second subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2 

因此,使用Completable觀察的,我們有辦法來控制何時讓可觀察EMIT項目。取自

實施例:http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDIT 根據第一百八滑動在this link

發佈的另一性質是,如果任何觀察者開始觀察後的可觀察到的10秒開始發光物品,觀察者獲得只有那些在10秒後(在訂購時)發射的物品不是全部物品。所以在側面,我可以理解發布正在用於UI事件。而且任何觀察者都應該只接收那些在訂閱完所有事件之後才執行的事件,這是完全合理的。

希望它有幫助。

+0

謝謝@ chandil03。是的,我知道發佈的想法。但是請查看幻燈片180上的[鏈接](http://jakewharton.com/the-state-of-managing-state-with-rxjava/) 。爲什麼在調用'Observable.merge()之前需要發佈? ' – Hohenhiem

+0

@NovoDimaporo請檢查編輯並讓我知道,如果您有任何疑問。 – chandil03

+0

那麼如果你能看到整個代碼,只有1個訂閱發生,沒有進一步的subsription。正如你可以在observableTransformer中看到它的用法一樣,除了'Observable.merge()'之外,已發佈的observable實際上並不在外面顯示,那麼它的用途是什麼? – Hohenhiem