2016-02-03 99 views
5

我是RxJS的新手,想知道是否有人可以幫助我。來自請求流的響應的同步流與RxJS

我想從請求流(有效負載數據)創建一個同步的響應流(最好與相應的請求)。

我基本上希望逐個發送請求,每個請求都等待最後一個響應。

我想這一點,但它在一次發送的一切(jsbin):

var requestStream, responseStream; 
 
requestStream = Rx.Observable.from(['a','b','c','d','e']); 
 

 
responseStream = requestStream.flatMap(
 
    sendRequest, 
 
    (val, response)=>{ return {val, response}; } 
 
); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('result for '+val);},1000); 
 
    }); 
 
};

以下的作品,在一定程度上,但請求數據不使用流(jsbin )。

var data, responseStream; 
 
data = ['a','b','c','d','e']; 
 
responseStream = Rx.Observable.create(observer=>{ 
 
    var sendNext = function(){ 
 
    var val = data.shift(); 
 
    if (!val) { 
 
     observer.onCompleted(); 
 
     return; 
 
    } 
 
    sendRequest(val).then(response=>{ 
 
     observer.onNext({val, response}); 
 
     sendNext(); 
 
    }); 
 
    }; 
 
    sendNext(); 
 
}); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500); 
 
    }); 
 
};

謝謝!

編輯:

只是爲了澄清,這就是我想實現:

「發送A,當您收到的響應,發送B,當你收到B響應,發送C,等等......」

使用concatMap和推遲,由user3743222的建議,似乎這樣做(jsbin):

responseStream = requestStream.concatMap(
    (val)=>{ 
    return Rx.Observable.defer(()=>{ 
     return sendRequest(val); 
    }); 
    }, 
    (val, response)=>{ return {val, response}; } 
); 

回答

3

嘗試在第一個代碼示例中替換flatMapconcatMap,並讓我知道結果行爲是否與您正在查找的內容相對應。

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    sendRequest, 
    (val, response)=>{ return {val, response}; } 
); 

基本上concatMap也有類似的簽名比flatMap,在行爲上是它會等待目前觀察到的被壓扁與下一個繼續之前完成的差異。所以在這裏:

  • a requestStream值將被推送到concatMap運營商。
  • concatMap操作者將產生一個sendRequest可觀察到的,以及任何值指出,觀察到的(似乎是一個元組(val, response))將通過選擇器功能被傳遞和該對象結果將被傳遞到下游
  • sendRequest完成後,將處理另一個requestStream值。
  • 總之,你的請求將通過一個

處理一個或者,也許你想使用defer推遲sendRequest的執行。

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    function(x){return Rx.Observable.defer(function(){return sendRequest(x);})}, 
    (val, response)=>{ return {val, response}; } 
); 
+0

謝謝你的迴應。我嘗試了您的解決方案,但請求仍然全部立即發送。文檔建議flatMap可能會導致交織,而concatMap不會。看起來差別在於排序。使用concatMap是有意義的,但它仍然不會產生所需的行爲:發送A,當您收到A的響應,發送B,收到B的響應,發送C等等。 – jamesref

+0

也許我誤解了你想要的內容。你能否在這種情況下嘗試「推遲」?我會更新代碼 – user3743222

+0

謝謝!它似乎在工作。 – jamesref