RxJS 5角2:通过前面的结果

RxJS 5角2:通过前面的结果

问题描述:

确定在我Angular 2typescript 2应用程序时间表重播订阅,我查询服务器需要定期更新的值。更新之间的延迟是可变的(服务器发送过期日期和值)。RxJS 5角2:通过前面的结果

我无法构成一个可观察数据流,将重播(启动对服务器的新的呼叫)一旦自动在当前值达到过期。我至今没有规模可言:

price = 5; //initial value is known 
expires = ...;//initial expiration is known 

getData(){ 
    // server returns {expires:number, price:number} 
    this.http.get('...').map(res => res.json()) 
} 

Observable.timer(expires-Date.now()) // when initial price expires 
    .switchMap(()=>this.getData()) // fetch new price and expiration 
    .subscribe(data => 
     { 
      this.price = data.price; 
      Observable.timer(data.expires-Date.now()) //redo when price expires 
       .switchMap(()=>getData()) 
       .subscribe(...) //callback hell (endless inner blocks) 
     } 
    ); 

必须有一个更好的方式来安排后续调用

我的做法是使用switchMap()以及timer()来延迟可观察链的开始,并且repeat()在前一个价格到期后继续寻找新的数据:

price = 0; //initial price 
_exp = 0; //initial delay before fetching data 

/** Observable will emit after the expiration delay */ 
wait() {return Observable.timer(this._exp*1000)} 

stream$ = Observable.of(null) 
    .switchMap(()=>this.wait()) // wait for the expiration delay     
    .switchMap(()=>this.getServerData()) // get fresh data 
    .do(e=>{this._exp = e.expires}) //update expiration    
    .repeat() // repeat until the calling code unsubscribes 

当我订阅时,立即取第一个价格,并且序列无限期重复,每个周期延迟expires秒。到达时,我可以更新与价格模型:

ngOnInit(){ 
    this.stream$.subscribe(e=>this.price = e.price); 
} 

Live demo

呼叫doObservableStuff最初,当以往任何时候都练琴

getData(){ 
    // server returns {expires:number, price:number} 
    this.http.get('...').map(res => res.json()) 
    .subscribe(data => 
     { 
      this.price = data.price; 
      doObservableStuff(data.expires-Date.now()) 
     }); 
} 

doObservableStuff(time){ 
    Observable.timer(time) 
    .switchMap(() => this.getData()) 
} 
+0

谢谢你的创造性的解决方案。 'getData()'实际上是在一个不同的服务中('DataService'),当前模块依赖于其他模块并且依赖于相同的'getData()',所以我不想从'doObservableStuff()'调用'doObservableStuff该方法,并且该方法与'this.price'不在同一个范围内;这两种方法应该是独立的。 – BeetleJuice