RxJs:轮询,直到间隔进行,或者正确的数据接收
我如何与RxJs的浏览器中执行以下情形:RxJs:轮询,直到间隔进行,或者正确的数据接收
- 提交数据排队等候处理
- 取回作业ID
- 调查另一个端点每1秒,直到结果可用或60秒已通过(再失败)
中级解决方案,我想出来的:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable
.interval(1000)
.delay(5000)
.map(_ => jobQueueData.jobId)
.take(55)
)
.flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
.filter(result => result.completed)
.subscribe(
result => console.log('Result', result),
error => console.log('Error', error)
);
- 有没有一种方法可以在数据到达或发生错误时停止定时器?我现在可以引入新的observable,然后使用
takeUntil
- 是
flatMap
这里用法的语义正确吗?也许这整个事情应该重写,而不是与flatMap
链接?
从顶部开始,你有承诺,你变成了可观察。一旦这个值产生一个值,你希望每秒拨打一次电话,直到你收到特定的响应(成功)或者经过一定的时间。我们可以这样解释每个部分映射到RX方法:
“一旦这会产生一个值” = map
/flatMap
(在这种情况下flatMap
因为接下来会也将是可观的,我们需要扁平化出来)
“每秒一次”= interval
“收到一定的响应”= filter
“或”= amb
“的时候一定已经过去了” = timer
从这里,我们可以拼凑出它像这样:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
一旦我们得到了我们的初步结果,我们预计分为两个之间的竞争可观察到的东西,当它收到成功的回应时会产生价值的东西,以及在一定时间过去后会产生价值的东西。第二个flatMap
这是因为.throw
不存在于可观察的实例上,并且Rx.Observable
上的方法返回一个观察值,该观察值也需要展平。
事实证明,在amb
/timer
组合实际上可通过timeout
代替,就像这样:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
我省略了.delay
您的样品中有,因为它是不是你所需要的逻辑描述,但它可以适用于这个解决方案。
因此,要直接回答你的问题:
- 在上面的代码有没有需要手动停止任何东西,因为
interval
将被设置在用户数量下降到零的时刻,这将发生当take(1)
或amb
/timeout
完成。 - 是的,你原来的两种用法都是有效的,因为在这两种情况下,你都将可观察元素的每个元素都投影到一个新的可观察元素中,并且希望将可观察到的可观察元素展平成一个常规可观察元素。
Here's the jsbin我扔一起测试溶液(你可以调整在pollQueueForResult
返回到获得期望的成功/超时值;倍已经由10快速测试起见划分)。
来自@ matt-burnell的优秀答案的一个小优化。您可以更换过滤和采取运营商与第一操作员
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.first(x => x.completed)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
);
此外如下,供人可能不知道,flatMap运营商是别名mergeMap在RxJS 5.0。
根据https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md flatMap仍然在RxJS 5中有效 – gerasalus
感谢您的详细解释! – gerasalus
@ matt-burnell非常好的答案,这让我无法结束!你有关于做指数后退的任何提示吗? –
@VegardLarsen那么,在这种情况下,如果你想退出轮询间隔,你所需要做的就是用你想要的值替换间隔(1000)流。例如,您可以使用Observable.just,delay和merge的组合来创建一个流,该流将在1,2,4,8,16和32秒标记处产生值(极低技术含量;可以是当然写一个函数来表达这个更优雅)。 –