Retrofit2 + RxJava2,无效的令牌,如何更新流时retryWhen()重新订阅

问题描述:

我有下面这个简单的代码来模拟一个场景即时通讯目前试图完成Retrofit2 + RxJava2,无效的令牌,如何更新流时retryWhen()重新订阅

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token") 
      .subscribeOn(Schedulers.io()) 
      .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() { 

       @Override 
       public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception { 
        return mApiService.api().getAccessToken(); 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .subscribe(new Observer<Void>() { 
       @Override 
       public void onSubscribe(Disposable d) { 
       } 

       @Override 
       public void onNext(Void value) { 
       } 

       @Override 
       public void onError(Throwable e) { 

        e.printStackTrace(); 
        onError(e); 
       } 

       @Override 
       public void onComplete() { 
       } 
      }); 

生病只是列举这让我目标明确:

  1. 与当前访问令牌执行POST呼叫
  2. 如果它接收一个适当的错误(404403,401或例如)
  3. 执行GET调用有一个新的访问令牌
  4. 使用新的访问令牌

基于上面的代码,到目前为止,与.retryWhen()我的理解重试整个序列,是否会在原始Observable(。 postSomethingWithAccessToken()),并在必要时(根据内部重试的条件重试),这里所发生的是,.retryWhen()首先执行外可观察到,引起不希望重复请求之前, 我怎么能实现这些基于我目前的理解(代码),上面提到的事情?任何帮助将不胜感激。 :(

编辑:当前的解决办法:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", "")) 
      .subscribeOn(Schedulers.io()) 
      .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { 

       @Override 
       public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception { 

        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { 

         @Override 
         public ObservableSource<?> apply(Throwable throwable) throws Exception { 

          if (throwable instanceof HttpException) { 

           HttpException httpException = (HttpException) throwable; 

           if (httpException.code() == 401) { 

            return mApiService.api().getAccessToken() 
              .doOnNext(new Consumer<Authentication>() { 
               @Override 
               public void accept(Authentication authentication) throws Exception { 
                update(authentication); 
               } 
              }); 
           } 
          } 

          return Observable.error(throwable); 
         } 
        }); 
       } 
      }) 
      .subscribe(new Observer<Void>() { 
       @Override 
       public void onSubscribe(Disposable d) { 
        Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", "")); 
       } 

       @Override 
       public void onNext(Void value) { 
        Log.e("onNext", "TOKEN : " + preferences.getString("access_token", "")); 
       } 

       @Override 
       public void onError(Throwable e) { 
        e.printStackTrace(); 
       } 

       @Override 
       public void onComplete() { 
        Log.e("Complete", "____ COMPLETE"); 
       } 
      }); 

方法是通过共享偏好

public void update(Authentication authentication) { 
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit(); 
} 

我注意到,(我把日志)外观察到的订阅和retryWhen是更新令牌在主线程执行,但重试/重新订阅的流跳过不同的调度程序的线程,这似乎是一个竞争条件:(

onSubscrbie_outer_observable: Thread[main,5,main] 
    RetryWhen: Thread[main,5,main] 
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main] 
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main] 
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main] 
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main] 
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main] 
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main] 
    // and so on... 

这里有几个问题:

  • 你需要访问令牌传递回postSomethingWithAccessToken方法时重试,否则你会只是老一套无效的访问令牌重试。
  • 当逻辑不正确时您重试,您必须对您收到的错误Observable作出响应并将您的重试逻辑放在那里。正如你所说的这种方法是首先执行的,而不是当错误发生时,throwableObservable是什么对错误的响应,它会将错误作为发射来镜像(onNext()),你可以flatMap()每个错误和响应有错误(用于向源发送错误流)完成,或与onNext()与某些对象发信号通知它重试。
    关于这个问题的一个很棒的blog post ban Dan Lew

因此,你需要:
1)存储访问令牌的地方在那里你可以访问令牌刷新更改。
2)固定重试时,逻辑正确错误

这里有一个建议代码回应:

postSomethingWithAccessToken(request, accessToken) 
     .subscribeOn(Schedulers.io()) 
     .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { 
        @Override 
        public ObservableSource<?> apply(
          @NonNull Observable<Throwable> throwableObservable) throws Exception { 
         return throwableObservable.flatMap(
           new Function<Throwable, ObservableSource<? extends R>>() { 
            @Override 
            public ObservableSource<? extends R> apply(
              @NonNull Throwable throwable) throws Exception { 
             if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here 
              return getAccessToken() 
                  .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken)); 
                //or keep accessToken on some field, the point to have mutable 
                //var that you can change and postSomethingWithAccessToken can see 
             } 
             return Observable.error(throwable); 
            } 
           }); 
         } 
        } 
     ) 
     .subscribeOn(Schedulers.io()) 
     .subscribe(new Consumer<Result>() { 
         @Override 
         public void accept(@NonNull Result result) throws Exception { 
          //handle result 
         } 
        } 
     ); 
+0

你好,非常感谢你提供一个代码示例并提及错误点,如果它没有太多要求,你会介意将代码改成非lambda ?, – Robert

+0

没问题,编辑 – yosriz

+0

非常感谢!生病了,生病尝试重做我的代码。 :),谢谢你的链接:) – Robert

非常感谢yosriz,因为他向我指出了正确的方向来解决我的牙齿磨的问题,我必须使用defer。所以我结束了这个问题GitHub,Why resubscribe the source observable emit same output when I use retryWhen operator?

这是我现在正好相同的问题,因为任何人遇到同样的问题在这里是我的解决方案。

Observable 
    .defer(new Callable<ObservableSource<?>>() { 
     @Override 
     public ObservableSource<?> call() throws Exception { 
      // return an observable source here, the observable that will be the source of the entire stream; 
     } 
    }) 
    .subscribeOn(/*target thread to run*/) 
    .retryWhen({ 
     // return a throwable observable here that will perform the logic when an error occurred 
    }) 
    .subscribe(/*subscription here*/) 

或这里是我的解决方案的全部非拉姆达

Observable 
    .defer(new Callable<ObservableSource<?>>() { 
     @Override 
     public ObservableSource<?> call() throws Exception { 
      return mApiService.api().postSomethingWithAccessToken(
       request, preferences.getString("access_token", "")); 
     } 
    }) 
    .subscribeOn(Schedulers.io()) 
    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { 
     @Override 
     public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception { 
      return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { 
       @Override 
       public ObservableSource<?> apply(Throwable throwable) throws Exception { 
        if (throwable instanceof HttpException) { 
         HttpException httpException = (HttpException) throwable; 
         if (httpException.code() == 401) { 
          return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() { 
            @Override 
            public void accept(Authentication authentication) throws Exception { 
             update(authentication); 
            } 
           }); 
         } 
        } 
        return Observable.error(throwable); 
       } 
      }); 
     } 
    }) 
    .subscribe(new Observer<Void>() { 
     @Override 
     public void onSubscribe(Disposable d) { 
      Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", "")); 
     } 

     @Override 
     public void onNext(Void value) { 
      Log.e("onNext", "TOKEN : " + preferences.getString("access_token", "")); 
     } 

     @Override 
     public void onError(Throwable e) { 
      e.printStackTrace(); 
     } 

     @Override 
     public void onComplete() { 
      Log.e("Complete", "____ COMPLETE"); 
     } 
    }); 

这里的关键点是“如何修改/更新现有的源观察到当.retryWhen()运营商重新订阅源可观察的”

我想在这里解决同样的问题,我尝试复制上面的解决方案,它刷新了令牌,但没有试图在我的令牌更新时重试请求。

这里是我的代码,而无需拉姆达:

public Observable<Estabelecimento> listarEstabelecimentos() { 
    return Observable.defer(new Callable<ObservableSource<? extends Estabelecimento>>() { 
     @Override 
     public ObservableSource<? extends Estabelecimento> call() throws Exception { 
      return mGetNetAPI.listarEstabelecimento() 
        .map(mNetworkErrorHandler::processError); 
     } 
    }).retryWhen(throwableObservable -> throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { 
                     @Override 
                     public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception { 
                      if (throwable instanceof UnauthorizedException) { 
                       return mRequestManager.getTokenObservable(AutoAtendimentoApplication.getContext()) 
                         .doOnNext(new Consumer<AuthResponse>() { 
                          @Override 
                          public void accept(@NonNull AuthResponse response) throws Exception { 
                           Log.i("NEXT", "OK"); 
                          } 
                         }).doOnError(new Consumer<Throwable>() { 
                          @Override 
                          public void accept(@NonNull Throwable throwable) throws Exception { 
                           Log.i("ONERROR", "NOT OK"); 
                          } 
                         }); 

                      } 

                      return Observable.error(throwable); 
                     } 
                    } 
    )); 
} 

任何ideias什么,我可能是做错了?

+0

嗨,我不知道如果我正确地理解它,因为我也是RxJava的初学者,但是我可以看到你的代码是第一个,外部observable没有订户(不知道这个),我想它是因为你的doOnError throwableObservable。 – Robert

+0

尝试省略它,如果它确实解决了问题,我认为它是因为在我们的例子中throwableObservable应该在接收到错误时处理/作出反应,并将流传回原始可观察对象,而不是直接在其内执行错误操作 – Robert

+0

多数民众赞成在@罗伯特,感谢您的提示! –