具有无限序列是真的荏苒则始终为假

问题描述:

我做了一个扩展方法:具有无限序列是真的荏苒则始终为假

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    return 
     source.TimeInterval() 
      .Select(
       (x, i) => 
        Observable.Return(x.Value) 
         .Delay(i == 0 
          ? TimeSpan.Zero 
          : TimeSpan.FromTicks(
            Math.Max(minDelay.Ticks - x.Interval.Ticks, 0)))) 
      .Concat(); 
} 

这将创建一个新的观察到的,只有通过与时间的最小间隔让项目。

要消除初始等待时间,有必要以不同的方式处理第一个项目。

可以看出,有一个测试,看看我们是通过测试i == 0来处理第一个项目。这里的问题是如果我们处理超过int.MaxValue项目,这将失败。

相反,我想到了以下顺序

var trueThenFalse = Observable.Return(true) 
        .Concat(Observable.Repeat(false)) 

,并拉上它在靠我的源:

source.TimeInterval().Zip(trueThenFalse, ... 

但通过这个无限序列拉链时,我们似乎进入一个死循环其中trueThenFalse一次发出所有物品(无限)。失败。

我可以很容易地编写这个带有副作用的代码(例如在外部范围内的bool),但这会代表我不满意的纯度损失。

有什么建议吗?

编辑

虽然不太一样的行为,下面的代码显示出一些讨厌的特质

var trueThenFalse = Observable.Return(true) 
    .Concat(Observable.Repeat(false)); 
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes 
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x)); 

,并最终死亡与OOME。这是因为trueThenFalse似乎取消了所有的值,但它们并未被Zip及时消耗。

+0

似乎是trueThenFalse和邮编我工作。 – Evk

+0

因此,这可能与延续计划的方式有关?鉴于'IObservable'只提供'Subscribe'方法,所以我很难理解无限(冷)序列在订阅这个“push”模型时不会立即尝试解除其所有项目的解除操作。 – spender

+0

但zip从两个序列中获取下一个元素。首先它从第一个序列中获得“真实”,从第二个(没有延迟)中获得第一个值。然后它从第二个序列中获得“假”和第二个值,但现在有一个延迟。不知道为什么它应该立即对所有物品进行整理也许你可以用trueThenFalse和Zip发布代码,你说失败了? – Evk

所以事实证明,Zip有another overload可以将一个IObservable序列与IEnumerable序列一起压缩。

通过将IObservable的推送语义与IEnumerable的拉语义相结合,可以使我的测试用例工作。

所以,用下面的方法:

private IEnumerable<T> Inf<T>(T item) 
{ 
    for (;;) 
    { 
     yield return item; 
    } 
} 

我们可以做出一个IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 

,然后用观察到的源住口:

var src = Observable.Interval(TimeSpan.FromSeconds(1)); 
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x)); 

...一切都按预期工作。

我现在有以下实现我的RateLimiter方法:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 
    return 
     source.TimeInterval() 
      .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value) 
       .Delay(firstTime 
        ? TimeSpan.Zero 
        : TimeSpan.FromTicks(
         Math.Max(minDelay.Ticks - item.Interval.Ticks, 0)))) 

      .Concat(); 
} 

这类似于Rx IObservable buffering to smooth out bursts of events,但你显然正在试图理解为什么你的解决方案并/不起作用。

我觉得那里的解决方案更优雅,尽管每个人都有自己的解决方案。