RxJava链式观测量和NetworkMainThreadException

RxJava链式观测量和NetworkMainThreadException

问题描述:

所以我有这样的代码:RxJava链式观测量和NetworkMainThreadException

public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) { 
    return Observable.<AbstractXMPPConnection>create(subscriber -> { 
     try { 
      AbstractXMPPConnection connection2 = connection.connect(); 
      if (connection2.isConnected()) { 
       subscriber.onNext(connection2); 
       subscriber.onCompleted(); 
      } 
     } catch (SmackException | IOException | XMPPException e) { 
      e.printStackTrace(); 
      subscriber.onError(e); 
     } 
    }) 
    .doOnError(throwable -> LOGI("111", "Connection OnError called")); 
} 


public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) { 
     return connect(connection) 
       .retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer)) 
         .flatMap(pair -> { 
          if (pair.second == MAX_LOGIN_TRIES) 
           return Observable.error(pair.first); 
          return Observable.timer(pair.second, TimeUnit.SECONDS); 
         })); 
    } 


public void connect() { 
     assertTrue("To start a connection to the server, you must first call init() method!", 
       this.connectionConfig != null); 

     connectionHelper.connectWithRetry(connection) 
       .observeOn(Schedulers.newThread()) 
       .subscribeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Subscriber<AbstractXMPPConnection>() { 
        @Override 
        public void onCompleted() { 
        } 

        @Override 
        public void onError(Throwable e) { 
         LOGI(TAG, "ConnectionHelper Connection onError\n"); 

         /**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */ 
         MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent()); 
        } 

        @Override 
        public void onNext(AbstractXMPPConnection connection) { 
         LOGI(TAG, "ConnectionHelper Connection onNext"); 
//      onConnected(); 
        } 
       }); 
    } 

我有链接可观察的一些问题。想象这种场景,其中我有一个连接Observable,有时候我使用它,但我主要使用的是可观察的connectWithRetry()

我的问题是,如果添加了此会发生什么:

.observeOn(Schedulers.newThread()) 
.subscribeOn(AndroidSchedulers.mainThread()) 

connect()connectWithRetry()两者兼而有之?在这种情况下,当我拨打 public void connect并指定一个调度程序时,以前的那些会被忽略?

而我为什么得到NetworkOnMainThreadException?明确的observeOn(Schedulers.newThread())在那里,它不应该给我那个错误

我会先解决你的NetworkOnMainThread问题。

observeOn(Schedulers.newThread())意味着输出将在一个新的线程观察 - 也就是说,在用户(onComplete/Error/Next)的代码将在该线程中运行。

subscribeOn(AndroidSchedulers.mainThread()意味着订阅会发生在主线程 - 在代码中您所创建的可观测(connection.connect()等)是订阅,会发生什么运行。

所以,简单地交换调度:

.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 

因此,解决第一个问题,他们没有理会,他们只是被错误地使用。希望从这里你可以看到如果你将类似的调用移动到你的方法中的链中,返回可观察到的结果会发生什么:没有什么不同,你已经做了什么。电话会在不同的地方。

那么在哪里把调度选择?这取决于你。您可以通过具有创建观测方法里面subscribeOn电话获得更高的清晰度:

connectionHelper.connectWithRetry(connection) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 

不过,如果你觉得你调用这个处处为没有理由,可以改为移动subscribeOn打电话给你的方法里面:

return connect(connection) 
      .retryWhen(...) 
      .flatMap(...) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

注意,这些不必是这样捆绑在一起了 - 你可以subscribeOn你的方法里面,但离开observeOn高达希望他们在一个特定的苏氨酸结果的任何呼叫者元首。

+0

好吧,所以考虑到我为每个observable指定了两个调度器,而第三个调度器是链接的,这是错误的吗?或者我应该只是简单地指定调度器只在公共无效连接?虐待接受你的答案,谢谢你的帮助 –

+0

我已经用一些例子更新了答案 - 这真的取决于你放在哪里!不过,我不确定你对第三个人的意思。 –

+0

好的感谢额外的信息,我欣赏它:P我的意见,名字很差的选择。我的意思是,Observable,观察员,Subscriber实施Observer。它就像subscribeOn是主要的问题,因为考虑当你订阅一个observable时,你指定了你想要对结果做什么,subscribeOn误导我们“在你想用结果做什么线程中”。 Imho –

请尝试Schedulers.io()可能会解决问题。