2017-07-20 54 views
1

我使用websocket從硬件接收數據幀。 數據幀的定義如下:如何使用RxJS從arrayBuffer拆分數據框?

0xbb(head) ---data--- 0xee(tail)

所接收的數據是存儲在Uint8Array,可以存在多個幀:

var buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]); 

,我可以在陣列轉換爲可觀察到的:

var obs= Rx.Observable.from(buffer);

RxMarbles:

--0xbb--0--0--0--0xee--0xbb--1--1--1--0xee--0xbb--2--2--2--0xee------ 
------------------000------------------111------------------222------ 

如何使用RxJS來拆分observable? 要使用哪些運營商? RxJS是這種情況的最佳實踐嗎?

+0

你的意思是你想將它轉換爲'Observables'數組? – CozyAzure

+0

是的,我想要將每個數據幀轉換爲可觀察的數字 – rhyttr

+0

@rhyttr什麼定義了每個Observable中的項目數? – martin

回答

2

const source = Rx.Observable 
 
    .from(['0xbb','0','0','0','0xee','0xbb','1','1','1','0xee','0xbb','3','3','3','0xee']) 
 
    .concatMap(i => Rx.Observable.of(i).delay(1)); 
 

 
source 
 
    .filter(i => i != '0xee' && i != '0xbb') 
 
    .buffer(source.filter(i => i === '0xee')) 
 
    .subscribe(val => console.log(val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

我們需要將這些值轉換通過,否則緩衝區的closingNotifier運行速度更快,然後值的攝取利用concatMap(.. delay(1))以異步和你結束了三個空數組。但是,既然你從websocket收到這些數據包,你已經是異步的了。

此代碼不是100%萬無一失的,例如當外部設備不發射時會發生什麼0xee?我們最終將下一條消息連接到前一條消息。

+0

感謝您的回答,您提醒我字節緩衝區需要轉換爲時間流,因此RxJS可以運行。但延遲concatMap(.. delay(1))會在快速和大量的通信時發生性能問題嗎? – rhyttr

+2

你也可以使用'observeOn(async)'而不是'delay(1)'。 'async'來自'rxjs/scheduler/async'的import {async};' – martin

2

我想你可以達到你想要的東西與scan()

const buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]); 
const obs = Observable.from(buffer); 

obs.scan((acc, v) => { 
    if (v === 0xbb) { 
     return [v]; 
    } else { 
     acc.push(v); 
     return acc; 
    } 
    }, []) 
    .filter(acc => acc[acc.length - 1] === 0xee) 
    .subscribe(console.log); 

這將打印以下陣列:

[ 187, 0, 0, 0, 238 ] 
[ 187, 1, 1, 1, 238 ] 
[ 187, 3, 3, 3, 238 ] 

不過,如果你需要發出與每個數據幀觀測量然後按照馬克的回答只需使用window()而不是buffer()

+0

是的,我需要frameable的可觀察性,但是謝謝你的回答,這是一種選擇方式! – rhyttr