观察同步结果和异步结果

观察同步结果和异步结果

问题描述:

使用Rx,我想观察一个传统对象,该对象公开了方法GetItems和事件NewItem观察同步结果和异步结果

当调用GetItems时,它将同步返回缓存中所有项的列表。它还会产生异步获取项目,这些项目将在收到时通过NewItem事件发布。

如何通过编写LINQ查询以一致的方式观察这两个源(sync + async),以便捕获这两个结果集?生产订单并不重要。

+0

你是什么意思的'一致'?你的意思是“在处理来自NewItem的任何项目之前,处理来自GetItems的所有项目?” –

+0

@Chris:对不起模糊 - 我想制定一个LINQ查询,以便捕获两个结果集。生产订单并不重要。 –

+0

@JohannGerell:Enigmativity在我原来的解决方案中发现了一个竞争条件。请看我更新的答案。 –

让我看看我是否理解你的遗留物体。我假设它是一个通用型,看起来像这样:

public class LegacyObject<T> 
{ 
    public IEnumerable<T> GetItems(); 
    public event EventHandler<NewItemEventArgs<T>> NewItem; 
} 

随着新项目的事件参数是这样的:

public class NewItemEventArgs<T> : System.EventArgs 
{ 
    public T NewItem { get; private set; } 
    public NewItemEventArgs(T newItem) 
    { 
     this.NewItem = newItem; 
    } 
} 

现在,我已经创建了一个LegacyObject<T>扩展.ToObservable()方法:

public static IObservable<T> ToObservable<T>(
    this LegacyObject<T> @this) 
{ 
    return Observable.Create<T>(o => 
    { 
     var gate = new object(); 
     lock (gate) 
     { 
      var list = new List<T>(); 
      var subject = new Subject<T>(); 
      var newItems = Observable 
       .FromEventPattern<NewItemEventArgs<T>>(
        h => @this.NewItem += h, 
        h => @this.NewItem -= h) 
       .Select(ep => ep.EventArgs.NewItem); 
      var inner = newItems.Subscribe(ni => 
      { 
       lock (gate) 
       { 
        if (!list.Contains(ni)) 
        { 
         list.Add(ni); 
         subject.OnNext(ni); 
        } 
       } 
      }); 
      list.AddRange(@this.GetItems()); 
      var outer = list.ToArray().ToObservable() 
       .Concat(subject).Subscribe(o); 
      return new CompositeDisposable(inner, outer); 
     } 
    }); 
} 

此方法创建为每个用户一个新的观察到 - 这是编写扩展方法,像这样的时候做正确的事情。

它创建一个gate对象来锁定对内部列表的访问。

因为您表示致电GetItems的行为会产生异步功能以获取新项目,所以我确保NewItem订阅是在致电GetItems之前创建的。

inner订阅会检查新项目是否在列表中,如果该项目不在列表中,只会调用OnNext

GetItems进行调用并将值通过AddRange添加到内部列表中。

NewItem事件开始在另一个线程上触发之前,不太可能但可能会将项目添加到列表中。这就是为什么锁定访问列表的原因。 inner订阅将等待,直到它可以获取锁定,然后再尝试将项目添加到列表中,这会在初始项目添加到列表后发生。

最后,内部列表变成可观察值,与主题连接,并且观察者订阅此可观察值。

使用CompositeDisposable将两个订阅作为单个IDisposable返回。

这就是它的ToObservable方法。

现在,我通过在遗留对象上创建一个构造函数来测试这个问题,它允许我传递一个枚举值和一个可观察值。当调用GetItems并且可观察值驱动NewItem事件时,将返回枚举值。

所以,我的测试代码是这样的:

var tester = new Subject<int>(); 
var legacy = new LegacyObject<int>(new [] { 1, 2, 3, }, tester); 

var values = legacy.ToObservable(); 

values.Subscribe(v => Console.WriteLine(v)); 

tester.OnNext(3); 
tester.OnNext(4); 
tester.OnNext(4); 
tester.OnNext(5); 

并写入到控制台中值分别为:

1 
2 
3 
4 
5 

让我知道这是否符合您的需求。

+0

谢谢 - 我不知所措!你说*这个方法为每个订阅者创建一个新的可观察对象 - 这是写这样的扩展方法时要做的正确的事情。* - 你有一些我可以阅读的文本的参考吗? –

+0

@JohannGerell - 请参阅[Rx设计指南](http://go.microsoft.com/fwlink/?LinkID=205219)第17页。 – Enigmativity

+0

完美! (在问这个问题之前,我实际上浏览了那个文档,但是那个特定的内容并没有粘住...... ;-) –

如果我理解正确你的问题,也可能是使用像这样做:

legacyObject.GetItems().ToObservable() 
    .Merge(
     Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem") 
      .Select(e => e.EventArgs.Value)); 

编辑:Enigmativity发现上面的解决方案(见下文评论)的竞争条件。这个有望解决这个问题:

Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem") 
    .Select(e => e.EventArgs.Value) 
    .Merge(Observable.Defer(() => legacyObject.GetItems().ToObservable())); 

这是我的测试代码的其余部分,如果它有帮助。我不知道我是否准确地模拟了你的遗产班级:

class IntEventArgs : EventArgs 
{ 
    public IntEventArgs(int value) 
    { 
     Value = value; 
    } 

    public int Value { get; private set; } 
} 

class Legacy 
{ 
    public event EventHandler<IntEventArgs> NewItem; 

    public IList<int> GetItems() 
    { 
     GenerateNewItemAsync(); 
     return new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; 
    } 

    private void GenerateNewItemAsync() 
    { 
     Task.Factory.StartNew(() => 
     { 
      for (var i = 0; i < 100000; ++i) 
      { 
       var handler = NewItem; 
       if (handler != null) handler(this, new IntEventArgs(i)); 
       Thread.Sleep(500); 
      } 
     }); 
    } 
} 

class Example 
{ 
    static void Main() 
    { 
     var legacyObject = new Legacy(); 

     legacyObject.GetItems().ToObservable() 
      .Merge(
       Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem") 
        .Select(e => e.EventArgs.Value)) 
      .Subscribe(Console.WriteLine); 

     Console.ReadLine(); 
    } 
} 
+0

真的很好!遗产班是现场。 –

+1

该解决方案存在争用条件,因为在事件处理程序订阅之前,'GetItems'在'i'等于'0'时触发第一个值。事实上,如果你从'GenerateNewItemAsync'方法中取出'Thread.Sleep(50)',那么在我的测试中,会丢失前34个值(0到33)。 – Enigmativity

+0

好的。如果我扭转小河的构成怎么样?让我编辑我的答案,并告诉我你的想法。 –