具有无限序列是真的荏苒则始终为假
问题描述:
我做了一个扩展方法:具有无限序列是真的荏苒则始终为假
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
return
source.TimeInterval()
.Select(
(x, i) =>
Observable.Return(x.Value)
.Delay(i == 0
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
.Concat();
}
这将创建一个新的观察到的,只有通过与时间的最小间隔让项目。
要消除初始等待时间,有必要以不同的方式处理第一个项目。
可以看出,有一个测试,看看我们是通过测试i == 0
来处理第一个项目。这里的问题是如果我们处理超过int.MaxValue
项目,这将失败。
相反,我想到了以下顺序
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false))
,并拉上它在靠我的源:
source.TimeInterval().Zip(trueThenFalse, ...
但通过这个无限序列拉链时,我们似乎进入一个死循环其中trueThenFalse
一次发出所有物品(无限)。失败。
我可以很容易地编写这个带有副作用的代码(例如在外部范围内的bool
),但这会代表我不满意的纯度损失。
有什么建议吗?
编辑
虽然不太一样的行为,下面的代码显示出一些讨厌的特质
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));
,并最终死亡与OOME。这是因为trueThenFalse
似乎取消了所有的值,但它们并未被Zip及时消耗。
答
所以事实证明,Zip有another overload可以将一个IObservable序列与IEnumerable序列一起压缩。
通过将IObservable的推送语义与IEnumerable的拉语义相结合,可以使我的测试用例工作。
所以,用下面的方法:
private IEnumerable<T> Inf<T>(T item)
{
for (;;)
{
yield return item;
}
}
我们可以做出一个IEnumerable:
var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
,然后用观察到的源住口:
var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x));
...一切都按预期工作。
我现在有以下实现我的RateLimiter方法:
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
return
source.TimeInterval()
.Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value)
.Delay(firstTime
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - item.Interval.Ticks, 0))))
.Concat();
}
答
这类似于Rx IObservable buffering to smooth out bursts of events,但你显然正在试图理解为什么你的解决方案并/不起作用。
我觉得那里的解决方案更优雅,尽管每个人都有自己的解决方案。
似乎是trueThenFalse和邮编我工作。 – Evk
因此,这可能与延续计划的方式有关?鉴于'IObservable'只提供'Subscribe'方法,所以我很难理解无限(冷)序列在订阅这个“push”模型时不会立即尝试解除其所有项目的解除操作。 – spender
但zip从两个序列中获取下一个元素。首先它从第一个序列中获得“真实”,从第二个(没有延迟)中获得第一个值。然后它从第二个序列中获得“假”和第二个值,但现在有一个延迟。不知道为什么它应该立即对所有物品进行整理也许你可以用trueThenFalse和Zip发布代码,你说失败了? – Evk