Observable.Zip当要压缩的序列数量未知时,直到运行时间
我需要为批准过程建模。之前它非常简单。两个角色不得不同意一些事情,然后我们可以去到下一个步骤:Observable.Zip当要压缩的序列数量未知时,直到运行时间
public class Approved
{
public string ApproverRole;
}
var approvals = Subscribe<Approved>();
var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP");
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP");
var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create);
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());
但现在有一个新的需求:角色批准的东西所需要的数量可以改变:
public class ApprovalRequested
{
public string[] Roles;
}
var approvalRequest = Subscribe<ApprovalRequested>().Take(1);
var approvals = Subscribe<Approved>();
var approvedByAll = ???;
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());
我觉得我错过了一些很明显的东西......任何人都可以指引我朝着正确的方向发展吗?
编辑
澄清:审批程序是对每个项目的基础。批准可以到达的顺序是未定义的。我们不关心一个角色是否多次批准某个项目。
问题实质上可以归结为从值的流中创建Set
,其中的值可能无序或本质上很多。
如果N是集合的基数,我们可以平凡地认为,只有至少有N种类型的值(在这种情况下,角色)被推送后,进程才会继续进行。
下面是Zip操作符的示例解决方案;或许这可以让你开始:
public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables)
{
return Observable.Create<IList<T>>(observer =>
{
List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>()));
return new CompositeDisposable(observables.Select((o, i) =>
o.Subscribe(value =>
{
lock (store)
{
store[i].Add(value);
if (store.All(list => list.Count > 0))
{
observer.OnNext(store.Select(list => list[0]).ToList());
store.ForEach(list => list.RemoveAt(0));
}
}
}))
);
});
}
测试:
这里 Observable.Interval(TimeSpan.FromSeconds(0.5))
.GroupBy(i => i % 3)
.Select(gr => gr.AsObservable())
.Buffer(3)
.SelectMany(set => set.Zip())
.Subscribe(v => Console.WriteLine(String.Join(",", v)));
的一个问题是,你可能会失去初始值,而正在形成的群体,所以你可能要为并入通过重写该方法为IObservable<IList<T>> Zip<TKey, T>(this IGroupedObservable<TKey, T> observables)
。
我无法找到此Zip过载。我正在使用包括Rx-Experimental在内的nuget运行Rx 2.0.20823。 –
@JoãoBragança我已经添加了一个Zip方法,它为您提供了一套基本的想法。 – Asti
Rx v2.0中存在以下过载: static IObservable
在当前版本的Rx(我从NuGet获得)中,有一个Zip()
版本,它需要一个观察值集合并返回一个可观察集合。有了这一点,你可以做这样的事情:
string[] requiredApprovals = …;
var approvedByAll = requiredApprovals
.Select(required => approvals.Where(a => a.ApproverRole == required))
.Zip();
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());
但作为@Enigmativity指出,这只会工作,如果你可以肯定,每个人以同样的顺序批准,所有项目最终会被批准所有必需的角色。如果不是,你需要一些比Zip()
更复杂的东西。
请参阅我对Asti的回答的评论。 –
我不知道为什么会这样。我也使用2.0.20823,它适用于我。虽然我没有Rx-Experimental。 – svick
啊,我相信这是因为你的目标是4.5,而我仍然在4.0。你有没有认识到为什么它不包括在内? –
'Zip'运算符希望成对的事情保持一步。没有销售副总裁的话,你在这里做的事情可能会得到财务副总裁的多项批准,并且事情可能会失去同步。你需要更好地定义你的需求。 – Enigmativity