2016-12-28 121 views
2

我試圖promisify流,但它似乎比我預期的更難。這裏是我的嘗試:Promisify streams

'use strict' 

const Promise = require('bluebird') 
const Twitter = require('twitter') 

const TwitterStream = module.exports = function TwitterStream (config) { 
    // init Twitter Streaming API for OAuth 
    this.stream = new Twitter({ 
    consumer_key: config.get('/twitter/consumerKey'), 
    consumer_secret: config.get('/twitter/consumerSecret'), 
    access_token_key: config.get('/twitter/accessTokenKey'), 
    access_token_secret: config.get('/twitter/accessTokenSecret') 
    }) 
    .stream('statuses/filter', { 
    track: config.get('/twitter/track') 
    }) 
} 

TwitterStream.prototype.receive = function() { 
    return new Promise((resolve, reject) => { 
    this.stream.on('data', resolve).on('error', reject) 
    }) 
} 

TwitterStream.prototype.destroy = function() { 
    this.stream.destroy() 
} 

的主要問題是,當我創建對象

const stream = new TwitterStream(config) 

stream.receive().then((data) => console.log(data)) 

當我執行只有一個對象被讀取。沒有其他數據流式傳輸。

TwitterStream.prototype.receive = function() { 
    return new Promise((resolve, reject) => { 
     this.stream 
     .on('data', (data) => resolve(data) 
     .on('error', (error) => reject(error)) 
     }) 
    } 
+0

我不知道TwitterStream api。這個流是無限期打開的,只要有新數據(tweets)可用,就會發出'data'事件。而'end'只在連接中斷時才被調用? –

+0

@ t.niese完全是 – Mazzy

+1

您無法直接將Promises映射到這種類型的流。我會嘗試創建一個例子,你將如何解決這個問題。 –

回答

0

您需要在stream.on函數的回調中返回承諾。此時,被調用時的方法receive僅返回一次解析的返回值或錯誤。

+0

你的意思是? (編輯該問題)我試過了,但它不起作用 – Mazzy

+0

仍然不起作用 – Mazzy

+0

是的,因爲該函數在調用時會返回一個Promise。如果它不是被調用一次以上,則該流不會被讀取多次。註冊一個事件處理程序是解決這個問題的更好方法。我的答案本身是錯誤的,因爲我沒有正確理解這個問題。我的錯。 – Bhargav

0

使用的Rx擴展,這是非常簡單的:

TwitterStream.prototype.receive = function() { 
    return Rx.Observable.create((observer) => { 
     this.stream 
      .on('data', (data) => observer.onNext(data)) 
      .on('error', (err) => observer.onError(err)); 
    }); 
} 

然後

const stream = new TwitterStream(config) 

stream.receive().subscribe((data) => console.log(data)); 
+0

儘管它看起來多餘(因爲訂閱時你仍然需要傳遞一個回調),但它與使用承諾時具有相同的好處:您可以應用諸如map,compose等操作,這些操作相當有用。 – olivarra1

0

下面是一個沒有經過測試,最有可能仍bug的代碼來說明你能如何與承諾做到這一點:

function defer() { 
    var resolve, reject; 
    var promise = new Promise(function() { 
    resolve = arguments[0]; 
    reject = arguments[1]; 
    }); 
    return { 
    resolve: resolve, 
    reject: reject, 
    promise: promise 
    }; 
} 


TwitterStream.prototype.receive = function() { 

    this.stream 
    .on('data', data => { 
     this.dataCache = this.dataCache || []; 
     this.dataCache.push(data); 
     this.tryToSendData() 
    }) 
    .on('end',() => { 
     this.finished = true; 
     this.tryToSendData() 
    }) 
    .on('error', err => { 
     this.lastError = err; 
     // error handling still missing 
    }) 

    return this; 
} 

TwitterStream.prototype.tryToSendData = function() { 
    if (this.defered) { 
    let defered = this.defered; 
    this.defered = null; 

    // if data is available or finished then pass the first element of buffer (or undefined) 
    defered.resolve(this.dataCache.shift()) 
    } 
} 

TwitterStream.prototype.getNextData = function() { 
    if (this.dataCache.length > 0 || this.finished) { 
    // if data is available or finished then pass the first element of buffer (or undefined) 
    return Promise.resolve(this.dataCache.shift()); 
    } else { 
    // otherwise we need a defered object 
    this.defered = defer(); 
    } 
} 

然後用法如下:

stream.receive().getNextData() 
    .then(function processData(data) { 
    if (data) { 
     console.dir(data); 

     // if data is available then continue requestin the data 
     return stream.getNextData().then(processData); 
    } 
    }) 

這是一種罕見的情況,您可以使用Deferreds。

0

我想你可能想看看我在scramjet已經promisified流。

對於你的Twitter例如該代碼應該很好地工作:

const stream = new Twitter({ 
    consumer_key: config.get('/twitter/consumerKey'), 
    consumer_secret: config.get('/twitter/consumerSecret'), 
    access_token_key: config.get('/twitter/accessTokenKey'), 
    access_token_secret: config.get('/twitter/accessTokenSecret') 
}) 
.stream('statuses/filter', { 
    track: config.get('/twitter/track') 
}) 
.pipe(new scramjet.DataStream) 

然後執行任何你喜歡的轉變......例如某種程度上映射流和累積流到一個數組時,即可大功告成。

stream.map(
    function (a) { return modifyTheTweetSomehow(a); } // a Promise can be returned here 
).accumulate(
    function(a, i) { a.push(i); }, 
    [] 
) // this returns a Promise that will be resolved on stream end. 

我希望你喜歡它。 :)