观察同步结果和异步结果
使用Rx,我想观察一个传统对象,该对象公开了方法GetItems
和事件NewItem
。观察同步结果和异步结果
当调用GetItems
时,它将同步返回缓存中所有项的列表。它还会产生异步获取项目,这些项目将在收到时通过NewItem
事件发布。
如何通过编写LINQ查询以一致的方式观察这两个源(sync + async),以便捕获这两个结果集?生产订单并不重要。
让我看看我是否理解你的遗留物体。我假设它是一个通用型,看起来像这样:
public class LegacyObject<T>
{
public IEnumerable<T> GetItems();
public event EventHandler<NewItemEventArgs<T>> NewItem;
}
随着新项目的事件参数是这样的:
public class NewItemEventArgs<T> : System.EventArgs
{
public T NewItem { get; private set; }
public NewItemEventArgs(T newItem)
{
this.NewItem = newItem;
}
}
现在,我已经创建了一个LegacyObject<T>
扩展.ToObservable()
方法:
public static IObservable<T> ToObservable<T>(
this LegacyObject<T> @this)
{
return Observable.Create<T>(o =>
{
var gate = new object();
lock (gate)
{
var list = new List<T>();
var subject = new Subject<T>();
var newItems = Observable
.FromEventPattern<NewItemEventArgs<T>>(
h => @this.NewItem += h,
h => @this.NewItem -= h)
.Select(ep => ep.EventArgs.NewItem);
var inner = newItems.Subscribe(ni =>
{
lock (gate)
{
if (!list.Contains(ni))
{
list.Add(ni);
subject.OnNext(ni);
}
}
});
list.AddRange(@this.GetItems());
var outer = list.ToArray().ToObservable()
.Concat(subject).Subscribe(o);
return new CompositeDisposable(inner, outer);
}
});
}
此方法创建为每个用户一个新的观察到 - 这是编写扩展方法,像这样的时候做正确的事情。
它创建一个gate
对象来锁定对内部列表的访问。
因为您表示致电GetItems
的行为会产生异步功能以获取新项目,所以我确保NewItem
订阅是在致电GetItems
之前创建的。
inner
订阅会检查新项目是否在列表中,如果该项目不在列表中,只会调用OnNext
。
对GetItems
进行调用并将值通过AddRange
添加到内部列表中。
在NewItem
事件开始在另一个线程上触发之前,不太可能但可能会将项目添加到列表中。这就是为什么锁定访问列表的原因。 inner
订阅将等待,直到它可以获取锁定,然后再尝试将项目添加到列表中,这会在初始项目添加到列表后发生。
最后,内部列表变成可观察值,与主题连接,并且观察者订阅此可观察值。
使用CompositeDisposable
将两个订阅作为单个IDisposable
返回。
这就是它的ToObservable
方法。
现在,我通过在遗留对象上创建一个构造函数来测试这个问题,它允许我传递一个枚举值和一个可观察值。当调用GetItems
并且可观察值驱动NewItem
事件时,将返回枚举值。
所以,我的测试代码是这样的:
var tester = new Subject<int>();
var legacy = new LegacyObject<int>(new [] { 1, 2, 3, }, tester);
var values = legacy.ToObservable();
values.Subscribe(v => Console.WriteLine(v));
tester.OnNext(3);
tester.OnNext(4);
tester.OnNext(4);
tester.OnNext(5);
并写入到控制台中值分别为:
1
2
3
4
5
让我知道这是否符合您的需求。
谢谢 - 我不知所措!你说*这个方法为每个订阅者创建一个新的可观察对象 - 这是写这样的扩展方法时要做的正确的事情。* - 你有一些我可以阅读的文本的参考吗? –
@JohannGerell - 请参阅[Rx设计指南](http://go.microsoft.com/fwlink/?LinkID=205219)第17页。 – Enigmativity
完美! (在问这个问题之前,我实际上浏览了那个文档,但是那个特定的内容并没有粘住...... ;-) –
如果我理解正确你的问题,也可能是使用像这样做:
legacyObject.GetItems().ToObservable()
.Merge(
Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
.Select(e => e.EventArgs.Value));
编辑:Enigmativity发现上面的解决方案(见下文评论)的竞争条件。这个有望解决这个问题:
Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
.Select(e => e.EventArgs.Value)
.Merge(Observable.Defer(() => legacyObject.GetItems().ToObservable()));
这是我的测试代码的其余部分,如果它有帮助。我不知道我是否准确地模拟了你的遗产班级:
class IntEventArgs : EventArgs
{
public IntEventArgs(int value)
{
Value = value;
}
public int Value { get; private set; }
}
class Legacy
{
public event EventHandler<IntEventArgs> NewItem;
public IList<int> GetItems()
{
GenerateNewItemAsync();
return new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
}
private void GenerateNewItemAsync()
{
Task.Factory.StartNew(() =>
{
for (var i = 0; i < 100000; ++i)
{
var handler = NewItem;
if (handler != null) handler(this, new IntEventArgs(i));
Thread.Sleep(500);
}
});
}
}
class Example
{
static void Main()
{
var legacyObject = new Legacy();
legacyObject.GetItems().ToObservable()
.Merge(
Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
.Select(e => e.EventArgs.Value))
.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
真的很好!遗产班是现场。 –
该解决方案存在争用条件,因为在事件处理程序订阅之前,'GetItems'在'i'等于'0'时触发第一个值。事实上,如果你从'GenerateNewItemAsync'方法中取出'Thread.Sleep(50)',那么在我的测试中,会丢失前34个值(0到33)。 – Enigmativity
好的。如果我扭转小河的构成怎么样?让我编辑我的答案,并告诉我你的想法。 –
你是什么意思的'一致'?你的意思是“在处理来自NewItem的任何项目之前,处理来自GetItems的所有项目?” –
@Chris:对不起模糊 - 我想制定一个LINQ查询,以便捕获两个结果集。生产订单并不重要。 –
@JohannGerell:Enigmativity在我原来的解决方案中发现了一个竞争条件。请看我更新的答案。 –