2014-01-09 140 views
3

假設我有一個服務:拍攝快照的的IObservable <T>

public interface ICustomersService 
{ 
    IObservable<ICustomer> Customers 
    { 
     get; 
    } 
} 

Customers財產開始通過搶佔了所有現有客戶,並把它們傳遞到觀察員,執行後,它只能通過對客戶說稍後會添加到系統中。因此,它永遠不會完成。

現在假設我想抓住當前客戶的快照(作爲List<ICustomer>),忽略將來可能添加的任何內容。我怎麼做?任何ToList()或其親屬的調用將永遠阻止,因爲序列永遠不會完成。

我想我可以寫我自己的擴展,所以我想這:

public static class RxExtensions 
{ 
    public static List<T> ToSnapshot<T>(this IObservable<T> @this) 
    { 
     var list = new List<T>(); 

     using (@this.Subscribe(x => list.Add(x))); 

     return list; 
    } 
} 

這似乎工作。例如:

var customers = new ReplaySubject<string>(); 

// snapshot has nothing in it 
var snapshot1 = customers.ToSnapshot(); 

customers.OnNext("A"); 
customers.OnNext("B"); 

// snapshot has just the two customers in it 
var snapshot2 = customers.ToSnapshot(); 

customers.OnNext("C"); 

// snapshot has three customers in it 
var snapshot3 = customers.ToSnapshot(); 

我意識到目前的實現依賴於調度是當前線程,否則在收到項目之前ToSnapshot可能會關閉其訂閱。不過,我懷疑我也可以包含一個ToSnapshot覆蓋,它需要IScheduler並確保在結束快照之前接收到計劃在那裏的任何項目。

我無法在Rx中找到這種快照功能。我錯過了什麼嗎?

+0

假設「快照」僅僅是枚舉當前成員所產生的枚舉......然後我無法想象快照例程會做什麼ToList()不會做。你能詳細說明一下嗎? –

+0

他的'Snapshot'擴展程序立即處理訂閱,僅收到'Subscribe'方法返回之前產生的通知。 –

+0

這實際上比這更多。對'IObservable '使用'ToList()'將直到'IObservable '完成纔會完全沒有任何問題。由於我的'IObservable'永遠不會完成,因此只需調用'ToList()'就會無限期地阻塞。 –

回答

0

你在這裏所做的實際上很漂亮。 ToSnapshot的工作原因是因爲在釋放控制流之前,您的訂閱邏輯的底層實現是將所有客戶交給觀察者。基本上,Dispose僅在控制流被釋放後才被調用,並且控制流只在獲得所有預先存在的聯繫後才被釋放。

雖然這是,這也是一個誤導。你寫的方法,ToSnapshot,應該被命名爲TakeSyncronousNotifications。該擴展正在對潛在觀察值的工作方式做出重大假設,並且不符合Rx的精神。

爲了讓消費者更容易理解,我將公開顯示聲明返回的其他屬性。

public interface ICustomersService 
{ 
    IEnumerable<ICustomer> ExistingCustomers { get; } 
    IObservable<ICustomer> NewCustomers { get; } 
    IObservable<ICustomer> Customers { get; } 
} 

public class CustomerService : ICustomerService 
{ 
    public IEnumerable<ICustomer> ExistingCustomers { get { ... } } 
    public IObservable<ICustomer> NewCustomers { get { ... } } 
    public IObservable<ICustomer> Customers 
    { 
     get 
     { 
      return this.ExistingCustomers.ToObservable().Concat(this.NewCustomers); 
     } 
    } 
} 

編輯:

考慮以下問題...

50 = x + y。解決並評估x

數學只是不工作,除非你知道y是什麼。在這個例子中,y是「新客戶」,x是「現有客戶」,而50是兩者的組合。

通過僅公開現有客戶和新客戶的組合,而不是現有客戶和新客戶自己,您已經丟失了太多數據。您需要向消費者展示至少xy,否則無法解決其他問題。

+0

感謝您的輸入。你在Rx中沒有提到可以做我想做的事情。在這種情況下,我用Rx咆哮錯誤的樹嗎? –

+0

你試圖一次做太多事情。 Rx非常適合新客戶部分。現有客戶應該公開爲IEnumerable,並且可以使用上面給出的代碼公開兩者的組合。 –

1

你可以嘗試使用超時您觀察到的

source.Customers().TakeUntil(DateTime.Now).ToEnumerable(); 
+0

謝謝,但這太難以預料了。我正在尋找具有明確定義的語義的東西。此外,在枚舉上調用「ToList」只會導致超時異常(奇怪)。 –

+0

抱歉超時是錯誤的。我會解決它。 – bradgonesurfing

+0

修復了使用TakeUntil(DateTime.Now)的代碼。超時異常是您使用Timeout時應該期待的。我的錯誤。 – bradgonesurfing

1

有幾種方法可以解決這個。我在商業項目中嘗試了以下成功案例:

1)一種單獨的方法,可以像克里斯演示的那樣獲得當前客戶的可枚舉數。

2)將「世界狀況」調用與直播流相結合的方法 - 這比Chris的示例更有意義,因爲爲了確保沒有錯過的數據,通常必須首先開始直播流,然後獲取快照,然後將它們與去重複組合。

我使用定製的Observable.Create實現實現了這一點,該實現高速緩存直播流直到檢索歷史記錄,然後在切換到現場之前將高速緩存與歷史記錄合併。

返回的客戶,但包含描述數據的年齡的額外的元數據。

3)最近,對我來說返回IObservable<IEnumerable<Customer>>更有用,其中第一個事件是整個世界狀態。這個更有用的原因是我工作的很多系統都批量更新,而且整個批次更新UI的速度往往比逐項更快。除此之外,它與(2)類似,只不過您可以使用FirstAsync()來獲取所需的快照。

我建議你考慮這種方法。如果需要,您可以隨時使用SelectMany(x => x)IObservable<IEnumerable<Customer>>的數據流平整爲IObservable<Customer>

當我回到家庭辦公室時,我會看看是否可以挖掘出一個示例實施!

+0

選項2只適用於冷觀察對象。否則,你有可能錯過第一個「n」個參數。 –