如何检测可观察到的所有包含的观测量完成<可观察
我想在我的API的方法返回可观察<可观察<对象> >,但我想在这种方法的代码知道一旦所有包含的观测量已完成,因此它可以关闭一些东西。做这个的最好方式是什么?如何检测可观察到的所有包含的观测量完成<可观察<Object>>
更明确的,我之后要完成这个方法:
public static <T> Observable<Observable<T>> doWhenAllComplete(
final Observable<Observable<T>> original, Action0 action) {
...
}
道歉,我的答案是.NET(因为是system.reactive标签);我相信你可以翻译它!
如果您IObservable<IObservable<Object>>
由source
然后给出:
source.Merge()
.Subscribe(_ => {}, /* not interested in onNext */
() => /* onCompleted action here, called when all complete */);
注:这将打破,如果任何流的错误(导致合并的流在该点结束),所以你也可以这样做对各个流吞下错误:
source.SelectMany(x => x.Catch(Observable.Empty<Object>()))
.Subscribe(_ => {}, /* not interested in onNext */
() => /* onCompleted action here, called when all complete */);
您假定Observable
虽然我认为合并方法将成为实现这一目标的关键,但将您的答案作为提示处理 –
我没有做出这样的假设 - 您只是要求知道所有的流何时完成,所以这就是我给你的。这听起来像也许你有一些额外的信息来添加到你的问题?要做的很好的事情是添加一个测试或测试,解决方案应该通过。看看你添加了什么,为什么你需要通过源代码?两个单独的订阅会不会这样做?它也可以避免副作用。 –
方法的这种实现似乎这样的伎俩没有副作用,我相信:
public static <T> Observable<Observable<T>> doWhenAllComplete(
final Observable<Observable<T>> original, final Action0 action) {
return Observable.create(new OnSubscribeFunc<Observable<T>>() {
@Override
public Subscription onSubscribe(Observer<? super Observable<T>> o) {
ConnectableObservable<Observable<T>> published = original
.publish();
Subscription sub1 = Observable.merge(published)
.doOnCompleted(action).subscribe();
Subscription sub2 = published.subscribe(o);
Subscription sub3 = published.connect();
return Subscriptions.from(sub1, sub2, sub3);
}
});
}
对于我来说,这个工程:
bothSources = source1.Cast<Object>().Merge (source2.Cast<Object>());
对我来说,我只需要等待2名的来源,但你可以创建一个接收源列表,并合并所有这些功能。
向我们展示了更多代码来更好地理解您的问题 – hoaz
这实际上取决于您的API方法如何创建包含的observables。你能从你的方法发布一些代码来产生'Observable>'吗?然后我们会知道这些内部可观察量来自哪里以及该方法跟踪其完成的最佳方式。 –
Brandon