使用被动扩展处理将密钥合并到1秒的事件更新
问题描述:
我以150-200次更新/秒的速度得到更新事件。我想把这个混淆为每个键1秒。使用被动扩展处理将密钥合并到1秒的事件更新
实施例:在图1第二I收到3个键A,B,C的更新中的顺序:A1,B1,C1,A2,A3,B2
我想处理这个更新每1秒&仅处理来自上述示例的A3,B2 & C1。
如何使用Reactive扩展来解决这个问题? 到目前为止,我尝试过:
Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.GroupBy(x => x.EventArgs.Key)
.Subscribe(g =>
{
g.Sample(TimeSpan.FromSeconds(1))
.Subscribe(x1 =>
{
updateSubject.OnNext(key);
});
});
当然不是我期待的。请为此提出正确的方法。
答
你需要的是更多的东西是这样的:
Observable
.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.GroupBy(x => x.EventArgs.Key)
.Select(g => g.Sample(TimeSpan.FromSeconds(1.0)))
.Merge()
.Subscribe(x =>
{
updateSubject.OnNext(key);
});
然而,这是一个非常糟糕的主意,有updateSubject.OnNext(key);
您.Subscribe
内。你真的应该展示更多的代码,以便我们可以建议如何正确处理。
答
没有太多之后
Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.GroupBy(x => x.EventArgs.Key)
.Subscribe(g =>
{
g.Sample(TimeSpan.FromSeconds(1))
.Subscribe(x1 =>
{
updateSubject.OnNext(key);
});
});
updateSubject
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(EventHandler); //Event Handler is the what gets called to handle the events
我做的合并并没有为我做到这一点,但我想:
Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.Distinct(x => x.EventArgs.Key)
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(x1 =>
{
updateSubject.OnNext(x1.EventArgs.NewValue);
});
不知道如果我这样做的权利这里。