2017-02-07 60 views
1

我一直遇到這種情況,我希望能夠通過RxJava監聽對請求的響應。問題是我不知道如何設置Observable,以便我正在偵聽事件,並以正確的順序在訂閱上發送消息。我不想發送消息然後聽,因爲如果線程暫停或響應超快,我可能會錯過它。這是我能想到上的最接近我自己收聽併發送訂閱RxJava

connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      ... // do stuff 

Observable.defer(() -> { 
    connection.send(message); 
    return connection.onReceivedMessage(); 
})... // do stuff 

但這些似乎仍像我仍然可以發送郵件,而不是被監聽響應。有沒有其他人試圖做到這一點?我覺得我真的想要一種afterCreate()。

回答

1

我不想發送消息,然後聽,因爲如果線程 被暫停或響應速度超快,我可能會錯過它。

使用SubjectBehaviorSubject(發出始終發射到新用戶的最新可觀測數據)或ReplySubject(發射發射到新用戶的所有Observable)。我不知道整個邏輯,但你可以有這樣的:

public BehaviorSubject mMessageBehaviorSubject = BehaviorSubject.create(); 


private void sendMessage() { 
    connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      .subscribe(mSubject::onNext, Throwable::printStackTrace); 
} 


public Observable<String> getMessageObservable() { 
    return mMessageBehaviorSubject.asObservable(); 
} 

這種方式,你可以發送郵件,只要你願意聽宇會得到,在這種情況下,最新的消息發送

0

doOnSubscribe正是你所需要的,它是你正在尋找的「afterCreate」。在第一個示例中,第一條消息將在訂閱後發送,此時Observable已經準備好處理響應。

至於你的問題 - 有沒有其他人試圖做到這一點? - 答案是肯定的。我使用與第一個示例相同的技術來使某些代碼變得粗糙。

+0

doOnSubscribe這樣的作品?我將不得不進行更多的測試。我承認我從未做過單元測試來驗證。 – Buttink