2017-03-05 57 views
2

好的,所以我是一個完整的Rx初學者,不幸的是js和js流也很新。我使用這個https://github.com/trygve-lie/twitter-stream-api來連接twitters streaming api並接收json對象和tweets。到目前爲止,我有這樣的代碼訂閱一個包含RxJS和twitter-stream-api模塊的流

var Rx = require('rxjs/Rx'); 

var TwitterStream = require('twitter-stream-api'), 
    fs = require('fs'); 
var filter = 'tweet'; 
var keys = { 
    consumer_key : "key", 
    consumer_secret : "secret", 
    token : "token", 
    token_secret : "tokensecret" 
}; 

var Twitter = new TwitterStream(keys); 
Twitter.stream('statuses/filter', { 
    track: filter 
}); 

Twitter.on('connection success', function (uri) { 
    console.log('connection success', uri); 
}); 
Twitter.on('data', function (obj) { 
    console.log(obj.text); 
}); 

我寫成功鳴叫控制檯,但我真的很努力學習正在與流,特別是RxJS。我嘗試了所有我能想到的方式來創建一個可觀察的事物。 Rx.Observable。創建/從等...

我也試過Twitter.resume(),因爲它顯然暫停默認恢復流並觀察。我只收到錯誤,如不能.subscribe不是一個函數。從我上面的內容來看,我如何使用Rx.Observable來開始過濾和使用數據?

謝謝!

回答

2

RxJS 5沒有任何方法將流轉換爲Observable,因此您需要自行完成此操作。理想情況下與Observable.create

const Rx = require('rxjs'); 
const Observable = Rx.Observable; 

var TwitterStream = require('twitter-stream-api'), 

... 

var source$ = Observable.create(observer => { 
    var Twitter = new TwitterStream(keys); 
    Twitter.stream('statuses/filter', { 
    track: filter 
    }); 

    Twitter.on('data', function (obj) { 
    observer.next(obj); 
    }); 

    return() => { 
    Twitter.close(); 
    }; 
}); 

這使得只有當您訂閱它時纔會連接到Twitter的Cold Observable。 Observable.create靜態方法可讓您將值推送給觀察者,並在最後返回一個拆除函數,然後關閉連接。當您取消訂閱或Observable完成時調用此函數。

然後你可以鏈這種可觀察任何你想要的:

source$.filter(...).map(...) 

注意,有是還的方法Observable.bindCallback()Observable.bindNodeCallback(),但這些不會幫助你多少在你的情況。

瞭解更多:

+0

非常感謝您的回答!我還沒有時間正確地嘗試它。快速問題,因爲我得到'Observable未定義',它不應該是Rx.Observable.create()?或者我需要(進口)更多的東西? – JimJimL

+0

@JimJimL現在看到代碼,我也加了'require()'調用。 – martin

0

下面是使用desmondmorris /節點的Twitter一個例子和rxjs 5.

const Observable = require('rxjs').Observable; 
 

 
Observable 
 
    .of(new require('twitter')({ 
 
    consumer_key: 'xxxx', 
 
    consumer_secret: 'xxxx', 
 
    access_token_key: 'xxxx', 
 
    access_token_secret: 'xxxx', 
 
    })).mergeMap(twitter => 
 
    Observable 
 
    .fromEvent(twitter.stream('statuses/filter', { 
 
     track: 'Stack Overflow' 
 
     }), 
 
     'data')) 
 
    .filter(tweet => tweet.user.follow_count > 10000) 
 
    .subscribe(console.log);