执行逻辑时RxJS 5的refcount()从源
问题描述:
连接到或取消订阅按照RxJS 5手册关于Multicasting执行逻辑时RxJS 5的refcount()从源
...我们可以使用ConnectableObservable的引用计数()方法(参考计数),它返回部一个可以跟踪它拥有多少用户的Observable。当用户数量从0增加到1时,它会为我们调用connect(),这会启动共享执行。只有订户数量从1减少到0时,它才会完全取消订阅,停止进一步执行。
我想了解是否有可能钩到这些事件,并执行一些逻辑,源可观察的connect()
或unsubscribe()
发生之前理想,但即使在事后是可以接受的。
如果在使用refCount()
运算符时没有办法执行此操作,如果能够提供一个示例,说明如何使用自定义运算符实现此目的,那么将非常感激。
我想也许我可以用do(nextFn,errFn,completeFn)
中的completeFn钩住这个,但似乎没有工作,如下面的代码片段所示。
var source = Rx.Observable.interval(500)
.do(
(x) => console.log('SOURCE emitted ' + x),
(err) => console.log('SOURCE erred ' + err),
() => console.log('SOURCE completed ')
);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
答
您可以使用.do(null,null, onComplete)
组合完成后,您的实际流和.finally()
前/退订有认购事项前及完成/取消之后的事件:
const source = Rx.Observable.empty()
.do(null,null,() => console.log('subscribed'))
.concat(Rx.Observable.interval(500))
.finally(() => console.log('unsubscribed'))
.publish().refCount();
const sub1 = source
.take(5)
.subscribe(
val => console.log('sub1 ' + val),
null,
() => console.log('sub1 completed')
);
const sub2 = source
.take(3)
.subscribe(
val => console.log('sub2 ' + val),
null,
() => console.log('sub2 completed')
);
// simulate late subscription setting refCount() from 0 to 1 again
setTimeout(() => {
source
.take(1)
.subscribe(
val => console.log('late sub3 val: ' + val),
null,
() => console.log('sub3 completed')
);
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
什么输出你期望从你做的例子?我想这是你正在寻找的。 – martin