我有一個sourceStream
組成的BaseData
對象。如何使用highland.js分叉流?
我想分流這個流到n
-不同的流的數量,然後過濾和轉換每個BaseData
對象到他們的喜好。
最後,我想要n
只包含特定類型的流,並且分叉的流的長度可能會有所不同,因爲可能會在未來刪除或添加數據。
我想我可以通過fork
設置它這樣的:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
我預計現在有兩個流,以及foo
-stream包含兩個元素:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
和bar
包含一個元素的流:
{ id: 'bar', data: 'narf' }
然而,我卻遇到了一個錯誤:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
如何將流分成多個流?
我也試圖鏈接的電話,但當時我只拿回一個流的結果:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
打印只:而不是預期的
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
也可能是我誤解的原因是fork
就是這樣。按its doc entry:
Stream.fork()叉流,使您可以添加額外的消費者 共享背壓。一條分流給多個消費者的流將只會從最低消費者處理它們的速度中儘可能快地從其來源獲取值。
注意:不要依賴叉子之間的一致執行順序。 此變換僅保證所有分支都將在任何處理第二個值欄之前處理值foo 。它並不保證叉的處理順序爲 。
提示:小心修改叉子中的流值(或使用這樣的庫來修改 )。 因爲每個分支都會傳遞相同的值到 ,所以在一個分支中所做的更改將在其後執行的任何分支中可見。再加上不一致的執行順序,以及 ,最終可能會產生細微的數據損壞錯誤。如果您需要修改 的任何值,則應該複製並修改副本。
廢棄警告:目前可能在使用 後(例如,通過轉換)對流進行分叉。在下一個主要版本中,這將不再是可能的 。如果你要分岔一個流,總是 呼籲分叉。
因此,而不是「如何叉流?」我的實際問題可能是:如何在高速流中複製高地流到不同的流中?
我已經更新了我的問題就像現在我沒有看到一個錯誤,但我只看到一個流的返回值,而不是他們兩個。 – k0pernikus