5
我是RxJS的新手,想知道是否有人可以幫助我。來自請求流的響應的同步流與RxJS
我想從請求流(有效負載數據)創建一個同步的響應流(最好與相應的請求)。
我基本上希望逐個發送請求,每個請求都等待最後一個響應。
我想這一點,但它在一次發送的一切(jsbin):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
以下的作品,在一定程度上,但請求數據不使用流(jsbin )。
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
謝謝!
編輯:
只是爲了澄清,這就是我想實現:
「發送A,當您收到的響應,發送B,當你收到B響應,發送C,等等......」
使用concatMap和推遲,由user3743222的建議,似乎這樣做(jsbin):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
謝謝你的迴應。我嘗試了您的解決方案,但請求仍然全部立即發送。文檔建議flatMap可能會導致交織,而concatMap不會。看起來差別在於排序。使用concatMap是有意義的,但它仍然不會產生所需的行爲:發送A,當您收到A的響應,發送B,收到B的響應,發送C等等。 – jamesref
也許我誤解了你想要的內容。你能否在這種情況下嘗試「推遲」?我會更新代碼 – user3743222
謝謝!它似乎在工作。 – jamesref