RxJS 5角2:通过前面的结果
问题描述:
确定在我Angular 2
typescript 2
应用程序时间表重播订阅,我查询服务器需要定期更新的值。更新之间的延迟是可变的(服务器发送过期日期和值)。RxJS 5角2:通过前面的结果
我无法构成一个可观察数据流,将重播(启动对服务器的新的呼叫)一旦自动在当前值达到过期。我至今没有规模可言:
price = 5; //initial value is known
expires = ...;//initial expiration is known
getData(){
// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())
}
Observable.timer(expires-Date.now()) // when initial price expires
.switchMap(()=>this.getData()) // fetch new price and expiration
.subscribe(data =>
{
this.price = data.price;
Observable.timer(data.expires-Date.now()) //redo when price expires
.switchMap(()=>getData())
.subscribe(...) //callback hell (endless inner blocks)
}
);
必须有一个更好的方式来安排后续调用
答
我的做法是使用switchMap()
以及timer()
来延迟可观察链的开始,并且repeat()
在前一个价格到期后继续寻找新的数据:
price = 0; //initial price
_exp = 0; //initial delay before fetching data
/** Observable will emit after the expiration delay */
wait() {return Observable.timer(this._exp*1000)}
stream$ = Observable.of(null)
.switchMap(()=>this.wait()) // wait for the expiration delay
.switchMap(()=>this.getServerData()) // get fresh data
.do(e=>{this._exp = e.expires}) //update expiration
.repeat() // repeat until the calling code unsubscribes
当我订阅时,立即取第一个价格,并且序列无限期重复,每个周期延迟expires
秒。到达时,我可以更新与价格模型:
ngOnInit(){
this.stream$.subscribe(e=>this.price = e.price);
}
答
呼叫doObservableStuff最初,当以往任何时候都练琴
getData(){
// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())
.subscribe(data =>
{
this.price = data.price;
doObservableStuff(data.expires-Date.now())
});
}
doObservableStuff(time){
Observable.timer(time)
.switchMap(() => this.getData())
}
谢谢你的创造性的解决方案。 'getData()'实际上是在一个不同的服务中('DataService'),当前模块依赖于其他模块并且依赖于相同的'getData()',所以我不想从'doObservableStuff()'调用'doObservableStuff该方法,并且该方法与'this.price'不在同一个范围内;这两种方法应该是独立的。 – BeetleJuice