Retrofit2 + RxJava2,无效的令牌,如何更新流时retryWhen()重新订阅
我有下面这个简单的代码来模拟一个场景即时通讯目前试图完成Retrofit2 + RxJava2,无效的令牌,如何更新流时retryWhen()重新订阅
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
生病只是列举这让我目标明确:
- 与当前访问令牌执行POST呼叫
- 如果它接收一个适当的错误(404403,401或例如)
- 执行GET调用有一个新的访问令牌
- 使用新的访问令牌
基于上面的代码,到目前为止,与.retryWhen()我的理解重试整个序列,是否会在原始Observable(。 postSomethingWithAccessToken()),并在必要时(根据内部重试的条件重试),这里所发生的是,.retryWhen()首先执行外可观察到,引起不希望重复请求之前, 我怎么能实现这些基于我目前的理解(代码),上面提到的事情?任何帮助将不胜感激。 :(
编辑:当前的解决办法:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
方法是通过共享偏好
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
我注意到,(我把日志)外观察到的订阅和retryWhen是更新令牌在主线程执行,但重试/重新订阅的流跳过不同的调度程序的线程,这似乎是一个竞争条件:(
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...
这里有几个问题:
- 你需要访问令牌传递回
postSomethingWithAccessToken
方法时重试,否则你会只是老一套无效的访问令牌重试。 - 当逻辑不正确时您重试,您必须对您收到的错误
Observable
作出响应并将您的重试逻辑放在那里。正如你所说的这种方法是首先执行的,而不是当错误发生时,throwableObservable
是什么对错误的响应,它会将错误作为发射来镜像(onNext()
),你可以flatMap()
每个错误和响应有错误(用于向源发送错误流)完成,或与onNext()
与某些对象发信号通知它重试。
关于这个问题的一个很棒的blog post ban Dan Lew。
因此,你需要:
1)存储访问令牌的地方在那里你可以访问令牌刷新更改。
2)固定重试时,逻辑正确错误
这里有一个建议代码回应:
postSomethingWithAccessToken(request, accessToken)
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(
@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(
new Function<Throwable, ObservableSource<? extends R>>() {
@Override
public ObservableSource<? extends R> apply(
@NonNull Throwable throwable) throws Exception {
if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
return getAccessToken()
.doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
//or keep accessToken on some field, the point to have mutable
//var that you can change and postSomethingWithAccessToken can see
}
return Observable.error(throwable);
}
});
}
}
)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Result>() {
@Override
public void accept(@NonNull Result result) throws Exception {
//handle result
}
}
);
非常感谢yosriz,因为他向我指出了正确的方向来解决我的牙齿磨的问题,我必须使用defer
。所以我结束了这个问题GitHub,Why resubscribe the source observable emit same output when I use retryWhen operator?
这是我现在正好相同的问题,因为任何人遇到同样的问题在这里是我的解决方案。
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
// return an observable source here, the observable that will be the source of the entire stream;
}
})
.subscribeOn(/*target thread to run*/)
.retryWhen({
// return a throwable observable here that will perform the logic when an error occurred
})
.subscribe(/*subscription here*/)
或这里是我的解决方案的全部非拉姆达
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return mApiService.api().postSomethingWithAccessToken(
request, preferences.getString("access_token", ""));
}
})
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
这里的关键点是“如何修改/更新现有的源观察到当.retryWhen()
运营商重新订阅源可观察的”
我想在这里解决同样的问题,我尝试复制上面的解决方案,它刷新了令牌,但没有试图在我的令牌更新时重试请求。
这里是我的代码,而无需拉姆达:
public Observable<Estabelecimento> listarEstabelecimentos() {
return Observable.defer(new Callable<ObservableSource<? extends Estabelecimento>>() {
@Override
public ObservableSource<? extends Estabelecimento> call() throws Exception {
return mGetNetAPI.listarEstabelecimento()
.map(mNetworkErrorHandler::processError);
}
}).retryWhen(throwableObservable -> throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (throwable instanceof UnauthorizedException) {
return mRequestManager.getTokenObservable(AutoAtendimentoApplication.getContext())
.doOnNext(new Consumer<AuthResponse>() {
@Override
public void accept(@NonNull AuthResponse response) throws Exception {
Log.i("NEXT", "OK");
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.i("ONERROR", "NOT OK");
}
});
}
return Observable.error(throwable);
}
}
));
}
任何ideias什么,我可能是做错了?
你好,非常感谢你提供一个代码示例并提及错误点,如果它没有太多要求,你会介意将代码改成非lambda ?, – Robert
没问题,编辑 – yosriz
非常感谢!生病了,生病尝试重做我的代码。 :),谢谢你的链接:) – Robert