2016-12-02 115 views
1

我試圖用從Google工作表中刪除的值填充我的數據庫。 hasuraHelper.insert()向我的數據庫發送一個HTTP帖子。我如何做到這一點,以便只有在前一個API調用返回後纔會調用insert()?我認爲concatMap可以做到這一點,但它似乎仍然熱切地贊同所有的排放。RxJS節點等待觀察完成,然後按順序訂閱下一個

目前:1請求 - >請求2 - > - >請求1個完成 - >請求2個完成 - > ...

我想什麼:請求1 - >請求1個完成 - >請求2 - >請求2個完成 - > ...

下面是相關代碼:

sheetsHelper.authToken() 
    .flatMap(sheetsHelper.get) 
    .flatMap(response => response.values) //values is a 2d array 
    .map(row => { 
     console.log(row[0]); 
     const flights = []; 
     //add 100-500 objects to this array 
     return flights; 
    }) 
    .flatMap(flights => { 
     const arrayOfFlightArrays = []; 
     //max batch size of 50 
     while (flights.length) { 
      const flightArr = flights.splice(0, 50); 
      arrayOfFlightArrays.push(flightArr); 
     } 
     return Rx.Observable.from(arrayOfFlightArrays); 
    }) 
    .concatMap(flights => hasuraHelper.insert(flights) //insert flights into table 
     .retry()) 
    .map(response => response.data) 
    .subscribe(console.log, console.error); 

這是插入的樣子:

exports.insert = function (objects) { 
    return Rx.Observable.fromPromise(axios({ 
     method: 'post', 
     url: '/', 
     data: { 
      type: "insert", 
      args: { 
       table: "flights", 
       objects: objects 
      } 
     } 
    })); 
}; 

回答

2

您的插入功能不是懶惰的。你正在執行一個Observable的承諾,但它已經在執行。

爲了得到一個承諾變得懶惰,開始處理訂閱時,你需要用它Rx.Observable.defer(() => Rx.Observable.fromPromise(axios(/* ... */))

+0

嗯,所以concatMap消耗排放盡快它可以創建可觀察到的,但只訂閱他們一次以前一個完成? –

+0

正確!大多數時候這不是問題,因爲常見的模式是觀察者很冷(=在訂閱之前不做事)。 –

相關問題