2015-01-17 105 views
0

當使用Rx(特別是RxJava)時,是否有一個操作符會將輸入變量與函數的輸出一起打包,以便在下一步中使用它們?任何返回輸入變量和結果的Rx運算符?

例如,假設我從推文ID列表開始,我的程序a)進行REST調用以獲取該推文的文本,並b)將該ID和文本保存到本地數據庫中。普通的「地圖」將返回步驟a的文本,但會丟棄原始ID。在代碼:

Observable.from(tweet_id_list) 
    .specialMap(i -> getTweetText(i)) // is there a "specialMap" which returns the string result AND ALSO the id? 
    .map((i, s) -> saveToLocalDB(i, s) // because we need both for the ensuing function 

我砍死getTweetText(i)與兩個變量,而不是一個簡單的字符串返回一個HashMap,但如果有一個的Rx運營商可能做到這一點,而無需修改底層函數這將是非常有用的。

+0

除非你有兩個地圖之間的事情,你可以簡單地將兩者結合成一個單一的功能:src.map(I - > saveToLocalDB(I,getTweetText(I)))。 – akarnokd

回答

2

創建或使用通用對類型或元組類型並使用它來映射您的流。事情是這樣的:

Observable.from(tweet_id_list) 
      .map(id -> Pair.create(id,getTweetText(id)) 
      .map(pair -> saveToLocalDB(pair.first(),pair.second()) 
+0

這是最簡單,最輕量,最容易實現的模式。謝謝。 – ExactaBox

1

如果你想有一個完整的地圖與所有的ids->填充文本:

Observable.from(tweet_id_list) 
    .toMap((id) -> id, (id) -> getTweetText(id)); 
    .subscribe((tweetMap) -> saveAllToLocalDB(tweetMap)); 

這將發出(假設整數ID和字符串文本)的單個地圖。如果您可以將整個地圖一次保存到數據庫中,而不是像原始代碼中那樣執行多個保存調用,這將非常有用(如上所述)。

你想確保在可觀察的調用中避免副作用。數據庫保存等副作用應該只發生在您進行訂閱呼叫並觀察結果時,而不是在地圖本身。如果您需要在後臺進行保存,則可以在.subscribe調用之前添加諸如.observeOn(Schedulers.io())之類的內容,具體取決於您的線程需求。

如果您希望獲得單個結果並逐個保存它們,您可能希望使用平面圖,以便可以並行執行查找。我在這裏使用defer,以便我們不會立即阻止getTweetText調用,而是等待每個訂閱。這使我們能夠控制我們所從事的工作。請注意,由於請求可能是併發的,因此結果不一定與tweet_id_list的順序相同。

Observable.from(tweet_id_list) 
    .flatMap((id) -> Observable.defer(() -> Observable.just(
     Pair.create(id, getTweetText(id))).subscribeOn(Schedulers.io())) 
    .subscribe((pair) -> saveToLocalDB(pair.first(), pair.second()) 
+0

感謝您的廣泛響應。爲了清楚起見,我在我的問題中使用了一個簡化的例子,它沒有涉及線程,所以謝謝指出。使用'toMap'和整合數據庫(或IO,網絡等)操作對於許多用例來說是一個很好的主意。但是,既然你已經提出了它,請幫我解釋一下你的最後一個代碼塊中的「延遲」。這是爲了確保每個'Observable '被分配給不同的用戶? – ExactaBox