1
我是rxjs的新手。我在下面調用一個函數,讀完整個流並打印讀取控制檯語句,但我從來沒有看到「Subscibe done」,我不知道爲什麼。要完成此流將需要什麼?是明顯的錯嗎?RxJS訂閱永遠不會完成
const readline$ = RxNode.fromReadLineStream(rl)
.filter((element, index, observable) => {
if (index >= range.start && index < range.stop) {
console.log(`kept line is ${JSON.stringify(element)}`);
return true;
} else {
console.log(`not keeping line ${JSON.stringify(element)}`);
return false;
}
})
.concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
.do(response => console.log(JSON.stringify(response)));
readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); },
err => { console.error(`Subscribe error: ${util.inspect(err)}`); },
done => { console.log("Subscribe done."); // NEVER CALLED
anotherFunc(); // NEVER CALLED
}
);
我不確定這個*特定的observable *,但不是每個observable都會完成。 –
你傳遞給'RxNode.fromReadLineStream'的節點流是什麼?流本身結束了嗎? – cartant
是的,它是一個有限長度文件的RxNode.fromReadLineStream。 –