2011-10-07 17 views
1

我有一個IObservable<byte[]>,它給了我在字節數組中的不確定數量的字節。我想知道我是如何去的,在每個字節數組中返回帶有一定字節數的IObservable<byte[]>。假設我們一次只需要10個字節。轉換具有不規則長度字節數組的IObservable <byte[]>到具有規則長度數組的IObservable <byte[]>

這就是說,如果我碰到下面的輸入,如果我要訂閱:

{1, 2, 3, 4} 
{5, 6} 
{7, 8, 9} 
{10} 
{11, 12, 13, 14, 15} 
{16} 
{17, 18} 
{19, 20} 

Bytes.Subscribe(b => Console.WriteLine(b.Length)); 

輸出將

3 
2 
3 
1 
5 
1 
2 
2 

我想什麼是將輸入轉化上述成這樣的:

{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} 
{11, 12, 13, 14, 15, 16, 17, 18, 19, 20} 

Bytes.<WhateverItTakesToDoThat>.Subscribe(b => Console.WriteLine(b.Length)); 

輸出將

10 
10 

它還必須工作,如果字節的量來在於比單個輸出分組較大,即:

{21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32} 
{33, 34, 35, 36, 37, 38, 39, 40, 41} 
{42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52} 

應該變成

{21, 22, 23, 24, 25, 26, 27, 28, 29, 30} 
{31, 32, 33, 34, 35, 36, 37, 38, 39, 40} 
{41, 42, 43, 44, 45, 46, 47, 48, 49, 50} 

(和被保持在到{51,52},等待更多的輸入來)

回答

4

這很容易。試試這個:

Bytes 
     .SelectMany(b => b) 
     .Buffer(10) 
     .Select(bs => bs.ToArray()); 
+0

在我的版本Rx的(1.0.10621)只的SelectMany返回從IEnumerable的,和IEnumerable <>沒有一個緩衝的擴展方法,所以我不得不添加之間的ToObservable()調用SelectMany()和Buffer()。之後,它的作品非常漂亮,謝謝。 :-) – Alex

+0

@Alex - 這就意味着「Bytes」是一個IEnumerable 而不是IObservable ,正如問題所暗示的那樣。嘗試在'SelectMany'之前放置'ToObservable'。這也應該起作用。 – Enigmativity

+0

是的,你說得對。這是我的錯;在測試過程中,我使用IEnumerable 來僞造輸入,而不用記住首先對其執行ToObservable()。實際輸入將直接是IObservable 。對於誤會感到抱歉。 :-) – Alex

1

我想出了一個解決方案,經過一些思考和修補。下面的代碼做什麼,我想:

Bytes.Select(b => b.ToObservable()) // Convert input to IObservable<IObservable<byte>> 
.Merge(1) // Merges the IObservable<IObservable<byte>> to an IObservable<byte> 
      // with the bytes in the right order 
.Buffer(4) // Wait until we have 4 bytes ready 
.Select(bl => bl.ToArray()) // Take these 4 bytes and turn them back into an array 
.Subscribe(b => Console.WriteLine(b.Length)); 

這可能是低效的,而且我幾乎可以肯定它不是這樣做的,他最有效的方法,所以如果有人在那裏能想出一個更好的,更有效解決方案,我都是耳朵!

相關問題