使用被动扩展处理将密钥合并到1秒的事件更新

使用被动扩展处理将密钥合并到1秒的事件更新

问题描述:

我以150-200次更新/秒的速度得到更新事件。我想把这个混淆为每个键1秒。使用被动扩展处理将密钥合并到1秒的事件更新

实施例:在图1第二I收到3个键A,B,C的更新中的顺序:A1,B1,C1,A2,A3,B2

我想处理这个更新每1秒&仅处理来自上述示例的A3,B2 & C1。

如何使用Reactive扩展来解决这个问题? 到目前为止,我尝试过:

Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default) 
      .GroupBy(x => x.EventArgs.Key) 
      .Subscribe(g => 
      { 
       g.Sample(TimeSpan.FromSeconds(1)) 
       .Subscribe(x1 => 
       { 
        updateSubject.OnNext(key); 
       }); 
      }); 

当然不是我期待的。请为此提出正确的方法。

你需要的是更多的东西是这样的:

Observable 
    .FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default) 
    .GroupBy(x => x.EventArgs.Key) 
    .Select(g => g.Sample(TimeSpan.FromSeconds(1.0))) 
    .Merge() 
    .Subscribe(x => 
    { 
     updateSubject.OnNext(key); 
    }); 

然而,这是一个非常糟糕的主意,有updateSubject.OnNext(key);.Subscribe内。你真的应该展示更多的代码,以便我们可以建议如何正确处理。

没有太多之后

Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default) 
      .GroupBy(x => x.EventArgs.Key) 
      .Subscribe(g => 
      { 
       g.Sample(TimeSpan.FromSeconds(1)) 
       .Subscribe(x1 => 
       { 
        updateSubject.OnNext(key); 
       }); 
      }); 

updateSubject 
.SubscribeOn(NewThreadScheduler.Default) 
.ObserveOn(NewThreadScheduler.Default) 
.Subscribe(EventHandler); //Event Handler is the what gets called to handle the events 

我做的合并并没有为我做到这一点,但我想:

Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default) 
      .Distinct(x => x.EventArgs.Key) 
      .Sample(TimeSpan.FromSeconds(1)) 
      .Subscribe(x1 => 
      { 
        updateSubject.OnNext(x1.EventArgs.NewValue); 
      }); 

不知道如果我这样做的权利这里。