RxJs 5 share()运算符是如何工作的?

RxJs 5 share()运算符是如何工作的?

问题描述:

对于我来说,它不是100%清楚如何RxJs 5 share()操作员工作,请参阅这里latest docs。 Jsbin的问题hereRxJs 5 share()运算符是如何工作的?

如果我创建一个可观察到的一系列的0至2,一秒分隔每个值:

var source = Rx.Observable.interval(1000) 
.take(5) 
.do(function (x) { 
    console.log('some side effect'); 
}); 

如果我创建了两个订户此观察到:

source.subscribe((n) => console.log("subscriptor 1 = " + n)); 
source.subscribe((n) => console.log("subscriptor 2 = " + n)); 

我得到这在控制台中:

"some side effect ..." 
"subscriptor 1 = 0" 
"some side effect ..." 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"some side effect ..." 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"some side effect ..." 
"subscriptor 2 = 2" 

我以为每个订阅都会订阅相同的Observable,但似乎并非如此!它就像订阅行为创建一个完全独立的Observable!

但如果share()经营者添加到源观察到:

var source = Rx.Observable.interval(1000) 
.take(3) 
.do(function (x) { 
    console.log('some side effect ...'); 
}) 
.share(); 

然后我们得到这个:

"some side effect ..." 
"subscriptor 1 = 0" 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"subscriptor 2 = 2" 

这是我所期望的没有share()

这是怎么回事,share()运营商是如何工作的?每个订阅是否创建一个新的Observable链?

要小心您使用RxJS v5,而您的文档链接似乎是RxJS v4。我不记得具体细节,但我认为share运营商经历了一些变化,特别是当涉及到完成和重新订阅时,但不要听我的话。

回到您的问题,正如您在研究中所显示的那样,您的期望与库设计不符。观察者懒洋洋地实例化他们的数据流,当用户订阅时具体地启动数据流。当第二个订阅者订阅相同的可观察数据时,另一个新的数据流就会开始,就好像它是第一个订阅者(所以是的,每个订阅都会创建一个新的观察链)。这就是RxJS术语中创建的一个冷观测值,这是RxJS可观测值的默认行为。如果你想要一个observable将数据发送给数据到达的用户,那么这是一个热门的可观察事物,而获得热门可观察数据的一种方法是使用share运算符。

您可以在这里找到说明的订阅和数据流:Hot and Cold observables : are there 'hot' and 'cold' operators?(这对于RxJS v4有效,但大部分对v5有效)。

份额使得可观察到的 “热”,如果这2个条件得到满足:

  1. 订户> 0
  2. 和可观察到的还没有完成

Scenario1的数目:订户数> 0并且在新订阅之前未观察到可观察

var shared = rx.Observable.interval(5000).take(2).share(); 
var startTime = Date.now(); 
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 3000); 

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds 
// another emission for both observers at: startTime + 10 seconds 

情景2:在订阅新订阅之前,订阅者数量为零。成为“冷”

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer1.unsubscribe(); 
}, 1000); 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time 
}, 3000); 
// observer2's onNext is called at startTime + 8 seconds 
// observer2's onNext is called at startTime + 13 seconds 

情况3:何时可观察是否在新订阅前完成。变得“冷”

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
     console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
    }; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 12000); 

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs 
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs