Rxjs - 只有第一个观察者可以看到从observable.share数据()

Rxjs - 只有第一个观察者可以看到从observable.share数据()

问题描述:

我有一些代码段如下Rxjs - 只有第一个观察者可以看到从observable.share数据()

var videosNeedFix = Rx.Observable.fromArray(JSON.parse(fs.readFileSync("videoEntries.json"))).share(); 

videosNeedFix.count().subscribe(function(count){ //subscrption A 
    console.log(count + " in total"); 
}); 


videosNeedFix.subscribe(function(videoEntry){ //subscription B 
    console.log(videoEntry.id, videoEntry.name, videoEntry.customFields); 
}); 

的videoEntries.json是的VideoEntry对象的JSON序列化阵列。我期待订阅A和订阅B都能收到由videosNeedFix observable发出的数据。

但是,根据控制台日志,只有订阅A将接收数据,但不会收到subscriptionB。如果我交换制作这两个订阅的顺序,只有subscriptionB才会看到这些数据。观察数据如何仅向第一次订阅发布数据?

+4

在第一'订阅( )',共享订阅在第二个'subscribe()'被调用之前被创建和完成。所以第二个用户只收到一个完成的事件。你可以看到,这是因为添加延迟将使它工作...''Rx.Observable.fromArray(JSON.parse(fs.readFileSync(“videoEntries.json”)))。delay(1000).share( );' – 2014-09-03 01:05:02

+0

这是[Cold vs Hot Observable](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables)问题。或者代替共享,请使用发布/收集。 – mtsr 2014-11-08 22:13:52

这是一个很好的用例(也许是唯一的 - 见To Use Subject Or Not To Use Subject?)的Rx.Subject

请看下面的例子。此代码(与.delay()黑客在评论中提到的)的工作,但似乎有点哈克对我说:

let stream$ = Rx.Observable 
     .return(updatesObj) 
     .map(obj => Object.assign({}, obj.localData, obj.updates)) 
     .delay(1) //Hacky way of making it work 
     .share() 

    stream$ 
     .flatMap(obj => Observable.fromPromise(AsyncStorage.setItem('items', JSON.stringify(obj)))) 
     .catch(Observable.return(false)) 
     .subscribe() 

     stream$ 
     .subscribe(obj => dispatch(dataIsReady(obj))) 

例与Rx.Subjects:

let subjS = new Rx.Subject() 

    let stream$ = subjS 
    .map(obj => Object.assign({}, obj.localData, obj.updates)) 
    .share() 

    stream$ 
    .flatMap(obj => Observable.fromPromise(AsyncStorage.setItem('items', JSON.stringify(obj)))) 
    .catch(Observable.return(false)) 
    .subscribe() 

    stream$ 
    .subscribe(obj => dispatch(dataIsReady(obj))) 

    subjS.onNext(updatesObj)