执行逻辑时RxJS 5的refcount()从源

执行逻辑时RxJS 5的refcount()从源

问题描述:

连接到或取消订阅按照RxJS 5手册关于Multicasting执行逻辑时RxJS 5的refcount()从源

...我们可以使用ConnectableObservable的引用计数()方法(参考计数),它返回部一个可以跟踪它拥有多少用户的Observable。当用户数量从0增加到1时,它会为我们调用connect(),这会启动共享执行。只有订户数量从1减少到0时,它才会完全取消订阅,停止进一步执行。

我想了解是否有可能钩到这些事件,并执行一些逻辑,源可观察的connect()unsubscribe()发生之前理想,但即使在事后是可以接受的。

如果在使用refCount()运算符时没有办法执行此操作,如果能够提供一个示例,说明如何使用自定义运算符实现此目的,那么将非常感激。

我想也许我可以用do(nextFn,errFn,completeFn)中的completeFn钩住这个,但似乎没有工作,如下面的代码片段所示。

var source = Rx.Observable.interval(500) 
 
    .do(
 
    (x) => console.log('SOURCE emitted ' + x), 
 
    (err) => console.log('SOURCE erred ' + err), 
 
    () => console.log('SOURCE completed ') 
 
); 
 
var subject = new Rx.Subject(); 
 
var refCounted = source.multicast(subject).refCount(); 
 
var subscription1, subscription2, subscriptionConnect; 
 

 
// This calls `connect()`, because 
 
// it is the first subscriber to `refCounted` 
 
console.log('observerA subscribed'); 
 
subscription1 = refCounted.subscribe({ 
 
    next: (v) => console.log('observerA: ' + v) 
 
}); 
 

 
setTimeout(() => { 
 
    console.log('observerB subscribed'); 
 
    subscription2 = refCounted.subscribe({ 
 
    next: (v) => console.log('observerB: ' + v) 
 
    }); 
 
}, 600); 
 

 
setTimeout(() => { 
 
    console.log('observerA unsubscribed'); 
 
    subscription1.unsubscribe(); 
 
}, 1200); 
 

 
// This is when the shared Observable execution will stop, because 
 
// `refCounted` would have no more subscribers after this 
 
setTimeout(() => { 
 
    console.log('observerB unsubscribed'); 
 
    subscription2.unsubscribe(); 
 
}, 2000);
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

+0

什么输出你期望从你做的例子?我想这是你正在寻找的。 – martin

您可以使用.do(null,null, onComplete)组合完成后,您的实际流和.finally()前/退订有认购事项前及完成/取消之后的事件:

const source = Rx.Observable.empty() 
 
    .do(null,null,() => console.log('subscribed')) 
 
    .concat(Rx.Observable.interval(500)) 
 
    .finally(() => console.log('unsubscribed')) 
 
    .publish().refCount(); 
 

 
const sub1 = source 
 
    .take(5) 
 
    .subscribe(
 
    val => console.log('sub1 ' + val), 
 
    null, 
 
    () => console.log('sub1 completed') 
 
    ); 
 
const sub2 = source 
 
    .take(3) 
 
    .subscribe(
 
    val => console.log('sub2 ' + val), 
 
    null, 
 
    () => console.log('sub2 completed') 
 
); 
 

 
// simulate late subscription setting refCount() from 0 to 1 again      
 
setTimeout(() => { 
 
    source 
 
    .take(1) 
 
    .subscribe(
 
     val => console.log('late sub3 val: ' + val), 
 
     null, 
 
    () => console.log('sub3 completed') 
 
    ); 
 
    
 
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>