如何创建一个可观察的另一个结束后,并结合结果
问题描述:
Observable<T1> a
Supplier<Observable<T2>> p
Function<T1, R> f
-
Function<T2, R> g
。
我想排序a
和p.get()
如下:
- 等待
a
完成,然后调用Observable<T2> b = p.get()
- 地图
a
和b
值使用f
和g
键入
- 将结果作为
Observable<R>
返回 - 当
a
或b
失败时结果应该失败 - 在
b
完成后应该完成。
R
这是我到目前为止已经试过(忽略f
和g
):
public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> p) {
final Subject<T> subject = PublishSubject.create();
a.subscribe(
subject::onNext,
subject::onError,
() -> {
p.get().subscribe(
subject::onNext,
subject::onError,
subject::onComplete);
});
return subject;
}
我应该如何实现呢?
答
public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> f) {
return a.publish(i -> Observable.merge(
i,
i.lastOrError().flatMapObservable(f::apply));
}
答
现在我没有IDE,所以我不确定这段代码实际编译。但这个想法是这样的:
a // your first observable
.map(f::apply) // map first result to R
.flatMap(r1 -> p.get() // "concat" second observable
.map(g::apply) // map result result to R
.map(r2 -> {
// some kind of operation between r1 and r2
})
)
.subscribe(next -> {
// do something with value
}, error -> {
// error from either observable
},() -> {
// completed!
});
如果f
计算是相当昂贵的,你只是想这样做,如果第二观察到。如果您需要的没有失败,你可以把它改成
a // your first observable
.flatMap(r1 -> p.get() // "concat" second observable
.map(g::apply) // map result result to R
.map(r2 -> {
R valueFromFirstObservable = f.apply(r1);
// some kind of operation between r1 and r2
})
)
.subscribe(next -> {
// do something with value
}, error -> {
// error from either observable
},() -> {
// completed!
});
首先观察到的BU你需要的第一个所有项目完全启动第二个前完成,你可以使用toList()
:
a // your first observable
.map(f::apply) // map first result to R
.toList() // by converting to a List you are forcing the observable to finish before continuing
.flatMap(r1Items -> p.get() // "concat" second observable
.map(g::apply) // map result result to R
.toList() // wait until p.get() finishes. Remove this line if you want to emit for all values
.map(r2Items -> {
// Some kind of operation between r1Items and r2Items
// Beware that now they are not of type R but List<R>
})
)
.subscribe(next -> {
// do something with value
}, error -> {
// error from either observable
},() -> {
// completed!
});
你尝试过什么吗?检查'map'和'flatMap'方法 – Pelocho
添加了我试过的东西。 “主题”似乎是错误的做法。 – sdgfsdh
确实如此。真正的答案比这个简单得多。当我写它时,坚持下去 – Pelocho