2017-02-06 88 views
0

我有一個sourceStream組成的BaseData對象。如何使用highland.js分叉流?

我想分流這個流到n-不同的流的數量,然後過濾和轉換每個BaseData對象到他們的喜好。

最後,我想要n只包含特定類型的流,並且分叉的流的長度可能會有所不同,因爲可能會在未來刪除或添加數據。

我想我可以通過fork設置它這樣的:

import * as _ from 'highland'; 

interface BaseData { 
    id: string; 
    data: string; 
} 

const sourceStream = _([ 
    {id: 'foo', data: 'poit'}, 
    {id: 'foo', data: 'fnord'}, 
    {id: 'bar', data: 'narf'}]); 

const partners = [ 
    'foo', 
    'bar', 
]; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream.fork(); 

    partnerStream.filter((baseData: BaseData) => { 
     return baseData.id === partner; 
    }); 

    partnerStream.each(console.log); 
}); 

我預計現在有兩個流,以及foo -stream包含兩個元素:

{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 

bar包含一個元素的流:

{ id: 'bar', data: 'narf' } 

然而,我卻遇到了一個錯誤:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338 
     throw new Error(
     ^

Error: Stream already being consumed, you must either fork() or observe() 
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15) 
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10) 
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18) 
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19) 
    at Array.forEach (native) 
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10) 
    at Module._compile (module.js:570:32) 
    at Object.Module._extensions..js (module.js:579:10) 
    at Module.load (module.js:487:32) 
    at tryModuleLoad (module.js:446:12) 

如何將流分成多個流?


我也試圖鏈接的電話,但當時我只拿回一個流的結果:

partners.forEach((partner: string) => { 
    console.log(partner); 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     }); 

    partnerStream.each((item: BaseData) => { 
     console.log(item); 
    }); 
}); 

打印只:而不是預期的

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 
{id: 'bar', data: 'narf'} 

也可能是我誤解的原因是fork就是這樣。按its doc entry

Stream.fork()叉流,使您可以添加額外的消費者 共享背壓。一條分流給多個消費者的流將只會從最低消費者處理它們的速度中儘可能快地從其來源獲取值。

注意:不要依賴叉子之間的一致執行順序。 此變換僅保證所有分支都將在任何處理第二個值欄之前處理值foo 。它並不保證叉的處理順序爲 。

提示:小心修改叉子中的流值(或使用這樣的庫來修改 )。 因爲每個分支都會傳遞相同的值到 ,所以在一個分支中所做的更改將在其後執行的任何分支中可見。再加上不一致的執行順序,以及 ,最終可能會產生細微的數據損壞錯誤。如果您需要修改 的任何值,則應該複製並修改副本。

廢棄警告:目前可能在使用 後(例如,通過轉換)對流進行分叉。在下一個主要版本中,這將不再是可能的 。如果你要分岔一個流,總是 呼籲分叉。

因此,而不是「如何叉流?」我的實際問題可能是:如何在高速流中複製高地流到不同的流中?

回答

0

在創建所有分支之前,必須記住不要使用分叉流。就好像一個使用分叉流,它和它的「父」將被消耗,從而使任何後續分叉從一個空流中分叉。

const partnerStreams: Array<Stream<BaseData>> = []; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     } 
    ); 

    partnerStreams.push(partnerStream); 
}); 

partnerStreams.forEach((stream, index) => { 
    console.log(index, stream); 
    stream.toArray((foo) => { 
     console.log(index, foo); 
    }); 
}); 

它打印:

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ] 
1 [ { id: 'bar', data: 'narf' } ] 
1

partnerStream.filter()返回一個新的流。然後,您再次使用partnerStream,使用partnerStream.each(),而不呼叫fork()observe()。因此,請將partnerStream.filter().each()調用鏈或將返回值partnerStream.filter()分配給一個變量並在其上調用.each()

+0

我已經更新了我的問題就像現在我沒有看到一個錯誤,但我只看到一個流的返回值,而不是他們兩個。 – k0pernikus