2016-04-13 87 views
8

我可以使用Retrofit + RxJava聽到層出不窮的流?例如Twitter流。我的是這樣的:Android Retrofit 2 + RxJava:聽不完的流

public interface MeetupAPI { 
    @GET("http://stream.meetup.com/2/rsvps/") 
    Observable<RSVP> getRSVPs(); 
} 

MeetupAPI api = new Retrofit.Builder() 
      .baseUrl(MeetupAPI.RSVP_API) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .build() 
      .create(MeetupAPI.class); 

api.getRSVPs() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(rsvp -> Log.d(TAG, "got rsvp"), 
       error -> Log.d(TAG, "error: " + error), 
       () -> Log.d(TAG, "onComplete")); 

但是「onComplete」在第一個對象被分析後被調用。有沒有辦法告訴Retrofit保持開放狀態,直至另行通知?

回答

9

這裏我的解決辦法:

可以使用@Streaming註釋:

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

隨着@Streaming我們可以得到原始輸入從ResponseBody

這裏我的功能包裝體通過線與事件分爲:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

,並導致使用:

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

UPD約優雅地停止

當我們退訂,改造關閉的InputStream。但是不可能從輸入流本身檢測輸入流是否關閉,所以唯一的方法 - 嘗試從流中讀取 - 我們通過Socket closed消息得到異常。 我們可以解釋這個異常截至收盤:

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

如果連接被關閉,因爲響應結束,我們沒有任何異常。這裏檢查一下isCompleted。讓我知道如果我錯了:)

+0

謝謝!有關如何正常停止監聽流的任何提示? – ticofab