2012-07-25 71 views
12

Throttle如果其他人跟隨得太快,則會跳過可觀察序列的值。但我需要一種方法來延遲它們。也就是說,我需要設置項目之間的最小延遲,而不會跳過任何在不跳過值的情況下節流接收Rx.Observable

實際例子:有一個Web服務可以接受請求不會比每秒一次更快;有一個用戶可以添加請求,單個或批量。沒有Rx,我會創建一個列表和一個計時器。當用戶添加請求時,我會將它們添加到列表中。在計時器事件中,我會檢查列表是否爲空。如果不是,我會發送請求並刪除相應的項目。用鎖和所有的東西。現在,通過Rx,我可以創建Subject,當用戶添加請求時添加項目。但我需要一種方法來確保Web服務不會因應用延遲而被淹沒。

我是Rx的新手,所以也許我錯過了一些明顯的東西。

回答

7

有一個相當簡單的方法做你想要使用EventLoopScheduler什麼。

我從一個可觀測值開始,每0到3秒會隨機產生一次值。

var rnd = new Random(); 

var xs = 
    Observable 
     .Generate(
      0, 
      x => x < 20, 
      x => x + 1, 
      x => x, 
      x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0)); 

現在,爲了使這個輸出值立即除非最後一個值是內一秒鐘前我這樣做:

var ys = 
    Observable.Create<int>(o => 
    { 
     var els = new EventLoopScheduler(); 
     return xs 
      .ObserveOn(els) 
      .Do(x => els.Schedule(() => Thread.Sleep(1000))) 
      .Subscribe(o); 
    }); 

這有效地觀察到在EventLoopScheduler源,然後把它睡在每個OnNext後1秒,以便它在喚醒後才能開始下一個OnNext

我測試了一下它的工作與此代碼:

ys 
    .Timestamp() 
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0) 
    .Subscribe(x => Console.WriteLine(x)); 

我希望這有助於。

+0

不是'Thread.Sleep'壞嗎?我一直認爲暫停一個線程是基本上「定時器」的事情是浪費資源。 – 2017-06-18 19:07:05

+0

@VarvaraKalinina - 它發生在它自己的線程上,它不會使其他任何東西死鎖。這種情況很好。 – Enigmativity 2017-06-18 23:37:54

+0

我沒有說它是關於死鎖或阻塞的東西。這是關於資源浪費。幾乎所有的時間你都會做出一個強有力的行動。它可以做其他事情,如果它不被阻止 – 2017-06-19 08:34:52

0

如何使用可觀察計時器從阻塞隊列中取出?下面的代碼是未經測試,但應該給你的我的意思的想法...

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ... 

Observable 
    .Timer(new TimeSpan(0,0,1), new EventLoopScheduler()) 
    .Do(i => myWebService.Send(workQueue.Take())); 

// Then just add items to the queue using workQueue.Add(...) 
+0

該解決方案可能會工作,類似於從問題的「RX之前」的解決方案,但有一個缺點:請求不會立即發送,甚至是可能的;只有當計時器滴答時。 1秒延遲不是很重要,但如果需要延遲的時間是1分鐘,那麼這是一個問題。 – Athari 2012-07-25 17:51:39

+0

@Athari不正確。第一個計時器滴答將阻止Take(),並執行一個項目入隊的時刻。您可以嘗試將超時設置爲1分鐘以查看我的意思。 – 2012-07-25 17:57:45

+0

我試過了,實際上還有另外一個問題:有時候兩個項目會被一個接一個地處理而沒有延遲。猜測發生的原因是,在等待Take時,另一個計時器事件排隊,而不等待上一個事件完成處理。我的代碼:http://pastebin.com/RRZ0ffBB – Athari 2012-07-25 20:50:05

0

考慮使用緩衝區。

http://msdn.microsoft.com/en-us/library/hh212130(v=vs.103).aspx

正如其名稱所示,它緩衝事件的觀察到的數據流中的元素沒有「滴」任何人。

-G

+2

'緩衝區'產生一個列表在每次迭代,但我需要一個單一的項目。在問題示例中,Web服務不支持批量請求。 – Athari 2012-07-26 03:00:09

2

怎麼樣一個簡單的擴展方法:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 

用法:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1)); 
0
.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any()) 
.Subscribe(buffer => 
{ 
    foreach(var item in buffer) Console.WriteLine(item) 
}); 
+0

這裏有同樣的問題:當你只需要一個項目時返回一個序列的開銷,或者什麼也沒有(等待,然後返回下一個項目,等等) – abatishchev 2014-09-03 21:24:19

3

我想建議使用Observable.Zip的方法:

// Incoming requests 
var requests = new[] {1, 2, 3, 4, 5}.ToObservable(); 

// defines the frequency of the incoming requests 
// This is the way to emulate flood of incoming requests. 
// Which, by the way, uses the same approach that will be used in the solution 
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;}); 
incomingRequests.Subscribe((number) => 
{ 
    Console.WriteLine($"Request received: {number}"); 
}); 

// This the minimum interval at which we want to process the incoming requests 
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1)); 

// Zipping incoming requests with the interval 
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;}); 

requestsToProcess.Subscribe((number) => 
{ 
    Console.WriteLine($"Request processed: {number}"); 
}); 
+0

我正在尋找一個rxjs的解決方案,這也只是在那裏工作。非常感謝! :) – Sammi 2017-10-11 09:17:06

0

我正在玩這個,發現。郵編(如前所述)是最簡單的方法:

var stream = "ThisFastObservable".ToObservable(); 
var slowStream = 
    stream.Zip(
     Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay 
     (x, y) => x); // We just care about the original stream value (x), not the interval ticks (y) 

slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}")); 

輸出:

T arrived after 00:00:01.0393840 
h arrived after 00:00:00.9787150 
i arrived after 00:00:01.0080400 
s arrived after 00:00:00.9963000 
F arrived after 00:00:01.0002530 
a arrived after 00:00:01.0003770 
s arrived after 00:00:00.9963710 
t arrived after 00:00:01.0026450 
O arrived after 00:00:00.9995360 
b arrived after 00:00:01.0014620 
s arrived after 00:00:00.9993100 
e arrived after 00:00:00.9972710 
r arrived after 00:00:01.0001240 
v arrived after 00:00:01.0016600 
a arrived after 00:00:00.9981140 
b arrived after 00:00:01.0033980 
l arrived after 00:00:00.9992570 
e arrived after 00:00:01.0003520 
相關問題