我想弄清楚如何爲特定時間窗口內不跟隨不同事件的事件創建反應式訂閱。在一段時間內事件沒有跟隨不同事件的反應式訂閱
爲了說明,這裏有一個用例:
由兩個事件觸發的忙閒指示。忙和NotBusy。它們可能會一起靠近,但指示燈不應該頻繁閃爍。
當NotBusy啓動時,應該沒有指示符。當遇忙併且NotBusy在5秒鐘內沒有開火時,它應該顯示。
有沒有一種方法可以完全在Reactive內完成而無需添加外部狀態?
我想弄清楚如何爲特定時間窗口內不跟隨不同事件的事件創建反應式訂閱。在一段時間內事件沒有跟隨不同事件的反應式訂閱
爲了說明,這裏有一個用例:
由兩個事件觸發的忙閒指示。忙和NotBusy。它們可能會一起靠近,但指示燈不應該頻繁閃爍。
當NotBusy啓動時,應該沒有指示符。當遇忙併且NotBusy在5秒鐘內沒有開火時,它應該顯示。
有沒有一種方法可以完全在Reactive內完成而無需添加外部狀態?
啊,我無法抗拒 - 多久你會反駁原始資料的作者? (見問題Benjol的評論) :)
這裏是我的刺它(LINQPad就緒):
輸出看起來是這樣的:
At 1/1/0001 12:00:00 AM +00:00, Busy Signal Indicator is now:OFF
Sending BUSY at 1/1/0001 12:00:00 AM +00:00
Sending NOTBUSY at 1/1/0001 12:00:02 AM +00:00
Sending BUSY at 1/1/0001 12:00:03 AM +00:00
Sending BUSY at 1/1/0001 12:00:06 AM +00:00
At 1/1/0001 12:00:08 AM +00:00, Busy Signal Indicator is now:ON
Sending NOTBUSY at 1/1/0001 12:00:09 AM +00:00
At 1/1/0001 12:00:09 AM +00:00, Busy Signal Indicator is now:OFF
這裏我們的活動的一個基本定義:
enum EventType
{
Busy,
NotBusy
}
class StreamEvent
{
public EventType Type {get; set;}
public StreamEvent(EventType type) { Type = type;}
}
而這裏的查詢+測試代碼:
void Main()
{
// our simulated event stream
var fakeSource = new System.Reactive.Subjects.Subject<StreamEvent>();
// Let's use a scheduler we actually don't have to wait for
var theTardis = new System.Reactive.Concurrency.HistoricalScheduler();
var busySignal = fakeSource
// Batch up events:
.Window(
// Starting batching on a busy signal
fakeSource.Where(e => e.Type == EventType.Busy),
// Stop batching on a not busy signal
(open) => fakeSource.Where(e => e.Type == EventType.NotBusy)
// but throw a timeout if we exceed 5 seconds per window
.Timeout(TimeSpan.FromSeconds(5), theTardis))
// Unpack the windows
.Switch()
// Catch any timeout exception and inject a NULL into the stream
.Catch(fakeSource.StartWith((StreamEvent)null))
// Bool-ify on "did a timeout happen?"
.Select(e => e == null)
// Start in an "unbusy" state
.StartWith(false)
// And only tell us about transitions
.DistinctUntilChanged();
using(busySignal.Subscribe(signal =>
Console.WriteLine("At {0}, Busy Signal Indicator is now:{1}",
theTardis.Now,
signal ? "ON" : "OFF")))
{
// should not generate a busy signal
Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(2));
Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
Console.WriteLine();
// should generate a busy signal
Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(3));
Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(3));
// and this should clear it
Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
Console.WriteLine();
}
}
@JayWalker您需要添加Rx程序集(您可以使用最新的Nuget LINQPad),並添加這些使用:System.Reactive.Subjects System.Reactive.Concurrency – JerKimball 2013-02-10 01:29:29
你的問題[有書呆子sniped](http://xkcd.com/356/)Rx的創造者回應他的回答:https://gist.github.com/mattpodwysocki/8d9a17df9fb94a3fb631(編輯。從[Paul Betts](http://stackoverflow.com/users/5728/paul-betts)刪除的答案複製) – Benjol 2013-02-13 05:51:24