2016-02-26 62 views
0

下面是我的確切代碼。注意foo(o)在訂閱lambda函數內被調用。這給Android提供了嚴格的模式錯誤。現在我當然可以在這裏分解一個線程,但這是Rxjava做事的方式?我現在不得不再次考慮線程嗎?想在這裏知道我最好的選擇。我怎麼能這樣做RxJava的方式,使其他部分的代碼可以訂閱新的事件,這將返回網絡呼叫響應?所以基本上,我通過異步響應將一種類型的事件MyObj轉換爲NetObj響應事件。RxJava - 如何啓動訂閱的lambda函數內的異步網絡調用? Observable

public static final PublishSubject<MyObj> myObjSubject = PublishSubject.create(); 
    public static final Observable<MyObj> observable = myObjSubject.asObservable(); 


protected CompositeSubscription myCompositeSubs = new CompositeSubscription(); 

myCompositeSubs.add(
      observable.subscribe((MyObj o) -> { 
       // `. Update UI with MyObject and 
       // 2. Kick off network call which will return JsonResponse type object which I would like to possibly process here and also publish to let others respond to it. 
       foo(o); // $$$$$ million dollar question what if I am kicking off something long running here? 
      })); 


private void foo(MyObj o){ 

// long running request. How to kick off long running from here? 
    JsonRespons resp = RestAdapter.makeQuest(o.url); 
    // I want this JsonResponse object to be an even others can listen for. I don't want to just process it right here. 

} 

更新:只是爲了闡明我正在嘗試做的是偵聽類(Android Fragment)中的數據事件MyObject。一旦我得到了這個,我想做兩件事情,用MyObject同步更新UI,這是我想要消費的東西,而不僅僅是轉換。但我也想開始一個長期的請求。一個返回JsonObject響應的網絡調用。這可能會在它返回時處理相同的片段,或者我可能希望其他類能夠監聽此事件。所以我並沒有單獨觀察MyObject來進行網絡調用並獲得響應。我需要做兩個操作,一個同步另一個異步。

+0

看起來像這種類型的問題可能已經在這裏討論:https://github.com/ReactiveX/RxJava/issues/1574 – FunctionallyReactive

回答

3

這是使用RxJava運算符(如flatMap)的絕佳機會。如果您需要更多文檔,請參閱here,但flatMap允許您帶一種類型的Observable(您的Observable<MyObj>)並返回另一種類型的Observable(在這種情況下,它可以是Observable<Void>Observable<Boolean>,如果您想知道您的長時間運行請求是否完成)。

看看這個代碼:

Subscription sub = observable.flatMap((MyObj o) -> { 
     // this will be run on your subscribeOn thread -- in background 
     return Observable.just(foo(o)); 
    }).subscribe(aVoid -> { 
     // we have completed both operations here, emissions will be 
     // on our observeOn() thread 
    }); 

如果您foo(MyObj)功能現在結構爲:

private Void foo(MyObj o) { 
    // long running stuff 
    return null; 
} 

它使返回類型Void,而不是void,使我們能夠很重要使用flatMap運算符並獲取返回的Observable<Void>

您所有的長期運行foo()操作將在您的subscribeOn()線程中運行,與您最初的觀察到一起,並在操作完成後,它會發出Void項目到您subscribe()observeOn()線程(這可能是你的主要如果你想通知UI,則在Android中進行線程)。

更新,以允許在flatMap調用UI工作:

注:這是假定第一observable定義subscribeOn()在後臺線程和observeOn()AndroidSchedulers.mainThread()

Subscription sub = observable.flatMap((MyObj o) -> { 
     // << do stuff with MyObj to UI HERE >> 
     return Observable.defer(() -> Observable.just(foo(o))) 
      .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 
    }).subscribe(aVoid -> { 
     // foo() has completed, we're back on the UI thread 
    }); 

如果你想用JsonResponse做一些事情,你可以改變foo()函數返回該函數而不是Void,那麼你就可以使用的結果在UI線程上。

更新2:更改Observable.create()Observable.defer()使其更清潔。

+0

只是爲了澄清我在一個類(Android Fragment),我想以這種方式處理此問題,以便我可以在接收MyObject時採取一些同步操作,但也希望進行異步調用,並在完成時或出錯時執行其他操作。 – FunctionallyReactive

+0

我有單身人士與各種觀察員和主題,但讓我們看看是否我返回在這一類內工作的Objservable.just(foo),但如果我想讓響應在片段外部可見,該怎麼辦?例如,我想要映射到JSonResponse。所以外面的課我希望他們能夠消費JSonResponse聽。外來者聽衆如何利用JSonResponse對象? – FunctionallyReactive

+0

所以我希望Fragment能夠對MyObj採取一些操作,並啓動這個異步操作,它將返回一個JsonResponse。它是我想要能夠傾聽或觀察的那種反應。我可能想要在將它踢出或者其他地方的片段中處理它。所以我說的是單個類將使用MyObj類來更新UI,並且它將啓動異步。一旦返回,我希望它可以發佈,或者有一種方法來監聽流的JsonResponse類型。 – FunctionallyReactive