RxJs:轮询,直到间隔进行,或者正确的数据接收

RxJs:轮询,直到间隔进行,或者正确的数据接收

问题描述:

我如何与RxJs的浏览器中执行以下情形:RxJs:轮询,直到间隔进行,或者正确的数据接收

  • 提交数据排队等候处理
  • 取回作业ID
  • 调查另一个端点每1秒,直到结果可用或60秒已通过(再失败)

中级解决方案,我想出来的:

Rx.Observable 
    .fromPromise(submitJobToQueue(jobData)) 
    .flatMap(jobQueueData => 
     Rx.Observable 
      .interval(1000) 
      .delay(5000) 
      .map(_ => jobQueueData.jobId) 
      .take(55) 
    ) 
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId))) 
    .filter(result => result.completed) 
    .subscribe(
     result => console.log('Result', result), 
     error => console.log('Error', error) 
    ); 
  1. 有没有一种方法可以在数据到达或发生错误时停止定时器?我现在可以引入新的observable,然后使用takeUntil
  2. flatMap这里用法的语义正确吗?也许这整个事情应该重写,而不是与flatMap链接?

从顶部开始,你有承诺,你变成了可观察。一旦这个值产生一个值,你希望每秒拨打一次电话,直到你收到特定的响应(成功)或者经过一定的时间。我们可以这样解释每个部分映射到RX方法:

“一旦这会产生一个值” = map/flatMap(在这种情况下flatMap因为接下来会也将是可观的,我们需要扁平化出来)

“每秒一次”= interval

“收到一定的响应”= filter

“或”= amb

“的时候一定已经过去了” = timer

从这里,我们可以拼凑出它像这样:

Rx.Observable 
    .fromPromise(submitJobToQueue(jobData)) 
    .flatMap(jobQueueData => 
    Rx.Observable.interval(1000) 
     .flatMap(() => pollQueueForResult(jobQueueData.jobId)) 
     .filter(x => x.completed) 
     .take(1) 
     .map(() => 'Completed') 
     .amb(
     Rx.Observable.timer(60000) 
      .flatMap(() => Rx.Observable.throw(new Error('Timeout'))) 
    ) 
) 
    .subscribe(
    x => console.log('Result', x), 
    x => console.log('Error', x) 
) 
; 

一旦我们得到了我们的初步结果,我们预计分为两个之间的竞争可观察到的东西,当它收到成功的回应时会产生价值的东西,以及在一定时间过去后会产生价值的东西。第二个flatMap这是因为.throw不存在于可观察的实例上,并且Rx.Observable上的方法返回一个观察值,该观察值也需要展平。

事实证明,在amb/timer组合实际上可通过timeout代替,就像这样:

Rx.Observable 
    .fromPromise(submitJobToQueue(jobData)) 
    .flatMap(jobQueueData => 
    Rx.Observable.interval(1000) 
     .flatMap(() => pollQueueForResult(jobQueueData.jobId)) 
     .filter(x => x.completed) 
     .take(1) 
     .map(() => 'Completed') 
     .timeout(60000, Rx.Observable.throw(new Error('Timeout'))) 
) 
    .subscribe(
    x => console.log('Result', x), 
    x => console.log('Error', x) 
) 
; 

我省略了.delay您的样品中有,因为它是不是你所需要的逻辑描述,但它可以适用于这个解决方案。

因此,要直接回答你的问题:

  1. 在上面的代码有没有需要手动停止任何东西,因为interval将被设置在用户数量下降到零的时刻,这将发生当take(1)amb/timeout完成。
  2. 是的,你原来的两种用法都是有效的,因为在这两种情况下,你都将可观察元素的每个元素都投影到一个新的可观察元素中,并且希望将可观察到的可观察元素展平成一个常规可观察元素。

Here's the jsbin我扔一起测试溶液(你可以调整在pollQueueForResult返回到获得期望的成功/超时值;倍已经由10快速测试起见划分)。

+0

感谢您的详细解释! – gerasalus

+0

@ matt-burnell非常好的答案,这让我无法结束!你有关于做指数后退的任何提示吗? –

+0

@VegardLarsen那么,在这种情况下,如果你想退出轮询间隔,你所需要做的就是用你想要的值替换间隔(1000)流。例如,您可以使用Observable.just,delay和merge的组合来创建一个流,该流将在1,2,4,8,16和32秒标记处产生值(极低技术含量;可以是当然写一个函数来表达这个更优雅)。 –

来自@ matt-burnell的优秀答案的一个小优化。您可以更换过滤采取运营商与第一操作员

Rx.Observable 
    .fromPromise(submitJobToQueue(jobData)) 
    .flatMap(jobQueueData => 
    Rx.Observable.interval(1000) 
     .flatMap(() => pollQueueForResult(jobQueueData.jobId)) 
     .first(x => x.completed) 
     .map(() => 'Completed') 
     .timeout(60000, Rx.Observable.throw(new Error('Timeout'))) 

) 
    .subscribe(
    x => console.log('Result', x), 
    x => console.log('Error', x) 
); 

此外如下,供人可能不知道,flatMap运营商是别名mergeMap在RxJS 5.0。

+0

根据https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md flatMap仍然在RxJS 5中有效 – gerasalus