2016-12-31 57 views
0

我正在訪問屬於不同計算過程的內存區域。 該地區的變化相對較少,我需要在發生變化時進行計算。我收到有關更改的通知,但我需要稍等一下,以確保不再進行更改。我像這個模型:如果有新物品到達可觀察物體,我可以檢查觀察者嗎?

var readyToBeProcessed = changed 
    .Select(x => DateTime.Now) 
    .Throttle(TimeSpan.FromSeconds(5)); 

但是我的計算需要相當一段時間,有可能是內存的變化,而我做他們。在這種情況下,我需要將這一輪計算標記爲無效。

但是我怎麼知道我的觀察,當我完成了計算,如果另一個事件到達與否,在處理當前事件?如果自開始計算以來沒有事件到達,那麼它是有效的,我可以存儲結果。

在實踐中,這是非常罕見,該事件中,使計算變得無效的模式(速度不夠快)到達,我還是想以應付這種情況。

注:我知道,我不能保證始終有效的計算。記憶變化和我收到事件之間有一小段時間。這是完全可能的,序列是這樣的1)我正在做計算2)內存變化3)我完成計算並檢查事件,並確定計算有效4)內存變化事件到達。我很高興與此住了,現在

readyToBeProcessed.Subscribe(x => 
{ 
    Log.Info("Start work..."); 
    // Do calculation here 
    ... 
    // When finished 
    if (Is there a new item) 
    { 
     Log.Info("There were changes while we worked... Invalidating"); 
     Invalidate(); 
    } 
    else 
    { 
     Log.Info("Succeeded"); 
    } 
}, cancellationToken); 

是反應壞適合這項任務?

回答

1

的Rx實際上是一個偉大的選擇,在這裏,我覺得,雖然你可能需要多一點明確建模。

想想事實上有五種類型的事件:項目的變化,做 - 工作開始,做 - 工作結束,失效,成功(我希望我可以使用更好的名字,但我正在努力什麼你寫了)。

這裏是他們是如何工作的一個大理石圖:

t(sec)  : 0--1--2--3--4--5--6--7--8--9--10-11-12-13-14-15-16... 
item-change : *-*--**-----------------*-------------------------... 
do-Work-begins: ---------------------*-----------------*----------... 
do-Work-ends : -------------------------*------------------*-----... 
invalidate : -------------------------*------------------------... 
succeeded  : --------------------------------------------*-----... 

我們開始工作,一旦出現了項目變化5秒平靜。如果在工作時間內有任何變化,我們希望在工作完成時失效。如果不是,我們希望觀察成功。

var doWorkBegins = changed 
    .Select(x => DateTime.Now) 
    .Throttle(TimeSpan.FromSeconds(5)); 

var doWorkEnds = doWorkBegins 
    .SelectMany(x => 
    { 
     Log.Info("Start work..."); 
     // DoWork(); 
     // 
     // should return an observable that returns a single value when complete. 
     // If DoWork is just a void, then can use 
     // return Observable.Return(Unit.Default); 
    }); 

var lists = changed 
    .Buffer(() => doWorkEnds) 
    .Publish().RefCount(); 

var succeeded = lists 
    .Where(l => l.Count == 0); 

var invalidate = lists 
    .Where(l => l.Count > 0); 

invalidate.Subscribe(x => 
{ 
     Log.Info("There were changes while we worked... Invalidating"); 
     Invalidate(); 
}, cancellationToken); 

succeeded.Subscribe(x => 
{ 
    Log.Info("Succeeded"); 
}, cancellationToken); 
1

理想情況下,我會建議你使用一個Task保持你的工作軌道,那麼你可以使用:

readyToBeProcessed 
.Select(evt => Observable.StartAsync<Unit>(async (cancellationToken) => 
{   
    //pass cancellationToken to work 
    var result = await DoWork(cancellationToken); 
    //test token if needed 
    return result; 
})) 
.Switch() 
.Subscribe(); 

當下次郵件到達,當前令牌將被取消。

+1

'Switch'是這裏的關鍵。您將計算結果「投影」到另一個異步值(a.k.a未來,任務或一個值的可觀察序列)。現在你有一系列的序列。當Switch()運算符到達時,它將訂閱內部序列,但取消該訂閱(以及當更新的內部序列到達時,基本的「任務」)。http://www.introtorx.com/content/v1.0.10621 .0/17_SequencesOfCoincidence.html –

+0

切換器將無法工作,原因有兩個:其中,「切換鍵」是項目,不可以進行處理。其次,他的描述聽起來像DoWork充滿了副作用,這隻有在正確兌現取消標記的情況下才會起作用。由於有一種方法失效,我假設它沒有。 – Shlomo

+0

儘管可以在取消標記上註冊「無效」。 – Asti