java.net.SocketTimeoutException:在聚集open_events流API时的超时

问题描述:

我想从聚会open_events流API收听层出不穷的流。出于某种原因,我得到一个随机时间段之后(似乎是)的socketTimeoutException。我的代码适用于Meetups rsvps streaming API。我正在使用一个okio.BufferedSource,如下所示。我得到了“while(!source.exhausted())”的例外,但是当写入“while(true)”时,会在下面一行中给出例外。据我所知,rsvps和open_events之间的区别在于rsvps使用长轮询并返回数组,而open_events使用分块传输编码来维护持久连接。问题是我无法使用BufferedSource分块响应,我应该使用什么呢?java.net.SocketTimeoutException:在聚集open_events流API时的超时

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 

       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
      subscriber.onCompleted(); 
     } 
    }); 

完整堆栈跟踪:

java.net.SocketTimeoutException: timeout 
at okio.Okio$3.newTimeoutException(Okio.java:207) 
at okio.AsyncTimeout.exit(AsyncTimeout.java:261) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215) 
at okio.RealBufferedSource.request(RealBufferedSource.java:71) 
at okio.RealBufferedSource.require(RealBufferedSource.java:64) 
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425) 
at okio.RealBufferedSource.read(RealBufferedSource.java:50) 
at okio.ForwardingSource.read(ForwardingSource.java:35) 
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285) 
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: Read timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at okio.Okio$2.read(Okio.java:139) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211) 
... 37 more 
Exception in thread "RxNewThreadScheduler-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: rx.exceptions.OnErrorNotImplementedException: timeout 
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) 
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) 
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) 
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157) 
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:812) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562) 
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:128) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:118) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:505) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:463) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:246) 
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147) 
at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:146) 
at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:125) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
at rx.Observable.unsafeSubscribe(Observable.java:8666) 
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) 
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
... 7 more 
Caused by: java.net.SocketTimeoutException: timeout 
at okio.Okio$3.newTimeoutException(Okio.java:207) 
at okio.AsyncTimeout.exit(AsyncTimeout.java:261) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:215) 
at okio.RealBufferedSource.request(RealBufferedSource.java:71) 
at okio.RealBufferedSource.require(RealBufferedSource.java:64) 
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444) 
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425) 
at okio.RealBufferedSource.read(RealBufferedSource.java:50) 
at okio.ForwardingSource.read(ForwardingSource.java:35) 
at retrofit2.OkHttpCall$ExceptionCatchingRequestBody$1.read(OkHttpCall.java:285) 
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60) 
at com.example.meetup.MeetupListener$1.call(MeetupListener.java:123) 
... 27 more 
Caused by: java.net.SocketTimeoutException: Read timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at okio.Okio$2.read(Okio.java:139) 
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211) 
... 37 more 

这可能不相关,但进一步下跌的错误堆栈存在的Add onError handling一提。也许你需要将这种方法添加到你的暗示中?

+0

谢谢你的回答,但不幸的是,这个解决方案没有奏效。 – DMJessieSP

+0

我想我错误地实现了onError,你能否提供任何如何正确执行它的例子?我试图按照下面的链接,但不能得到它的工作:https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators – DMJessieSP

+0

对不起,我不熟悉这个API。我的回答完全基于读取错误堆栈跟踪。 –