2017-04-05 36 views
0

我想從聚會open_events流API收聽層出不窮的流。出於某種原因,我得到一個隨機時間段之後(似乎是)的socketTimeoutException。我的代碼適用於Meetups rsvps streaming API。我正在使用一個okio.BufferedSource,如下所示。我得到了「while(!source.exhausted())」的例外,但是當寫入「while(true)」時,會在下面一行中給出例外。據我所知,rsvps和open_events之間的區別在於rsvps使用長輪詢並返回數組,而open_events使用分塊傳輸編碼來維護持久連接。問題是我無法使用BufferedSource分塊響應,我應該使用什麼呢?java.net.SocketTimeoutException:在聚集open_events流API時的超時

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 

       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
      subscriber.onCompleted(); 
     } 
    }); 

完整堆棧跟蹤:

java.net.SocketTimeoutException: timeout 
at okio.Okio$3.newTimeoutException(Okio.java:207) 
at okio.AsyncTimeout.exit(AsyncTimeout.java:261) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215) 
at okio.RealBufferedSource.request(RealBufferedSource.java:71) 
at okio.RealBufferedSource.require(RealBufferedSource.java:64) 
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425) 
at okio.RealBufferedSource.read(RealBufferedSource.java:50) 
at okio.ForwardingSource.read(ForwardingSource.java:35) 
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285) 
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: Read timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at okio.Okio$2.read(Okio.java:139) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211) 
... 37 more 
Exception in thread "RxNewThreadScheduler-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: rx.exceptions.OnErrorNotImplementedException: timeout 
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) 
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) 
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) 
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157) 
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:812) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562) 
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:128) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
... 7 more 
Caused by: java.net.SocketTimeoutException: timeout 
at okio.Okio$3.newTimeoutException(Okio.java:207) 
at okio.AsyncTimeout.exit(AsyncTimeout.java:261) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215) 
at okio.RealBufferedSource.request(RealBufferedSource.java:71) 
at okio.RealBufferedSource.require(RealBufferedSource.java:64) 
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425) 
at okio.RealBufferedSource.read(RealBufferedSource.java:50) 
at okio.ForwardingSource.read(ForwardingSource.java:35) 
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285) 
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123) 
... 27 more 
Caused by: java.net.SocketTimeoutException: Read timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at okio.Okio$2.read(Okio.java:139) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211) 
... 37 more 

回答

0

這可能不相關,但進一步下跌的錯誤堆棧存在的Add onError handling一提。也許你需要將這種方法添加到你的暗示中?

+0

謝謝你的回答,但不幸的是,這個解決方案沒有奏效。 – DMJessieSP

+0

我想我錯誤地實現了onError,你能否提供任何如何正確執行它的例子?我試圖按照下面的鏈接,但不能得到它的工作:https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators – DMJessieSP

+0

對不起,我不熟悉這個API。我的回答完全基於讀取錯誤堆棧跟蹤。 –

相關問題