0
我想從聚會open_events流API收聽層出不窮的流。出於某種原因,我得到一個隨機時間段之後(似乎是)的socketTimeoutException。我的代碼適用於Meetups rsvps streaming API。我正在使用一個okio.BufferedSource,如下所示。我得到了「while(!source.exhausted())」的例外,但是當寫入「while(true)」時,會在下面一行中給出例外。據我所知,rsvps和open_events之間的區別在於rsvps使用長輪詢並返回數組,而open_events使用分塊傳輸編碼來維護持久連接。問題是我無法使用BufferedSource分塊響應,我應該使用什麼呢?java.net.SocketTimeoutException:在聚集open_events流API時的超時
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
完整堆棧跟蹤:
java.net.SocketTimeoutException: timeout
at okio.Okio$3.newTimeoutException(Okio.java:207)
at okio.AsyncTimeout.exit(AsyncTimeout.java:261)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215)
at okio.RealBufferedSource.request(RealBufferedSource.java:71)
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.ForwardingSource.read(ForwardingSource.java:35)
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at okio.Okio$2.read(Okio.java:139)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
... 37 more
Exception in thread "RxNewThreadScheduler-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorNotImplementedException: timeout
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:812)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:128)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146)
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:8666)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
... 7 more
Caused by: java.net.SocketTimeoutException: timeout
at okio.Okio$3.newTimeoutException(Okio.java:207)
at okio.AsyncTimeout.exit(AsyncTimeout.java:261)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215)
at okio.RealBufferedSource.request(RealBufferedSource.java:71)
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.ForwardingSource.read(ForwardingSource.java:35)
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123)
... 27 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at okio.Okio$2.read(Okio.java:139)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
... 37 more
謝謝你的回答,但不幸的是,這個解決方案沒有奏效。 – DMJessieSP
我想我錯誤地實現了onError,你能否提供任何如何正確執行它的例子?我試圖按照下面的鏈接,但不能得到它的工作:https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators – DMJessieSP
對不起,我不熟悉這個API。我的回答完全基於讀取錯誤堆棧跟蹤。 –