2016-07-17 55 views
1

我試圖在使用RxJava時從Retrofit 2.1向Spotify Web API發出請求。我希望每個請求都可以在自己的線程中執行,並且在準備就緒時按任意順序打印結果。onNext未在Retrofit和RxJava中調用線程時運行

當前的代碼在主線程上執行並工作。但是,當我在Rx鏈中插入.subscribeOn(Schedulers.newThread())時,我得到一個空輸出。 看來,onNext()永遠不會被調用。

我看到這通常是通過在Android中插入.observeOn(AndroidSchedulers.mainThread())來修復的,但是我怎樣才能在普通的Java 8(沒有RxAndroid)中做到這一點?

public interface SpotifyService { 

    @GET("tracks/{trackId}") 
    Observable<SpotifyTrack> getTrack(@Path("trackId") String id); 

    Retrofit retrofit = new Retrofit.Builder() 
      .baseUrl("https://api.spotify.com/v1/") 
      .addConverterFactory(JacksonConverterFactory.create()) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build(); 
} 


public class Main { 

    private static SpotifyService spotifyService = SpotifyService.retrofit.create(SpotifyService.class); 

    public static void main(String[] args) { 
     String[] trackIds = { 
      "spotify:track:2HUI2s84pkL5815G8WI1Lg", 
      "spotify:track:1bZrI1KgVKr8Qfja9cnmGh", 
      "spotify:track:1WP1r7fuvRqZRnUaTi2I1Q", 
      "spotify:track:5kqIPrATaCc2LqxVWzQGbk", 
      "spotify:track:0mWiuXuLAJ3Brin3Or2x6v", 
      "spotify:track:0BF6mdNROWgYo3O3mNGrBc" 
     }; 

     printTrackNames(trackIds); 
    } 

    private static void printTrackNames(String[] trackIds) { 
     Observable.from(trackIds) 
      .map(Main::toTrackId) 
      .flatMap(spotifyService::getTrack) 
      .map(SpotifyTrack::getName) 
      .subscribe(System.out::println); 
    } 

    private static String toTrackId(String track) { 
     if (track.contains("spotify:track:")) { 
      return track.split(":")[2]; 
     } 

     return track; 
    } 
} 
+2

那是因爲你的main()方法返回,並處理被關閉(調度器使用守護線程)。出於演示目的,您可以阻止當前線程,並可能獲得預期的結果。一種方式是通過在printTrackNames():'Observable.empty()。延遲(10,TimeUnit.SECONDS).toBlocking()。subscribe();' –

+0

'後添加'Thread.sleep()'或延遲阻止observable當然。我沒有想過這個。感謝您的回答!如果你把它變成答案,我會把它標記爲答案。 – skovmand

回答

1

既然你沒有運行的循環將繼續執行就像你在Android的擁有,你將需要阻止主線程,因此不會退出main()及認購事項完成前死亡。你可以這樣通過使用toBlocking()

1

這不是關於那不會onNext,但因爲它在另一個線程中,你不能調試它。

爲了證明這一點,請使用TestSubscriber等待觀察者的完成。

這裏有一個單元測試來證明它。

@Test 
public void testObservableAsync() throws InterruptedException { 
    Subscription subscription = Observable.from(numbers) 
              .subscribeOn(Schedulers.newThread()) 
              .subscribe(number -> System.out.println("Items emitted:" + total)); 
    System.out.println("I finish before the observable finish. Items emitted:" + total); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

如果想看到更多ASYN測試到這裏看看https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

相關問題