Observable.Zip当要压缩的序列数量未知时,直到运行时间

问题描述:

我需要为批准过程建模。之前它非常简单。两个角色不得不同意一些事情,然后我们可以去到下一个步骤:Observable.Zip当要压缩的序列数量未知时,直到运行时间

public class Approved 
{ 
    public string ApproverRole; 
} 

var approvals = Subscribe<Approved>(); 

var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP"); 
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP"); 

var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create); 

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess()); 

但现在有一个新的需求:角色批准的东西所需要的数量可以改变:

public class ApprovalRequested 
{ 
    public string[] Roles; 
} 
var approvalRequest = Subscribe<ApprovalRequested>().Take(1); 
var approvals = Subscribe<Approved>(); 

var approvedByAll = ???; 

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess()); 

我觉得我错过了一些很明显的东西......任何人都可以指引我朝着正确的方向发展吗?

编辑

澄清:审批程序是对每个项目的基础。批准可以到达的顺序是未定义的。我们不关心一个角色是否多次批准某个项目。

+1

'Zip'运算符希望成对的事情保持一步。没有销售副总裁的话,你在这里做的事情可能会得到财务副总裁的多项批准,并且事情可能会失去同步。你需要更好地定义你的需求。 – Enigmativity

问题实质上可以归结为从值的流中创建Set,其中的值可能无序或本质上很多。

如果N是集合的基数,我们可以平凡地认为,只有至少有N种类型的值(在这种情况下,角色)被推送后,进程才会继续进行。

下面是Zip操作符的示例解决方案;或许这可以让你开始:

public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables) 
    { 
     return Observable.Create<IList<T>>(observer => 
     { 
      List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>())); 

      return new CompositeDisposable(observables.Select((o, i) => 
       o.Subscribe(value => 
       { 
        lock (store) 
        { 
         store[i].Add(value); 

         if (store.All(list => list.Count > 0)) 
         { 
          observer.OnNext(store.Select(list => list[0]).ToList()); 
          store.ForEach(list => list.RemoveAt(0)); 
         } 
        } 
       })) 
      ); 
     }); 
    } 

测试:

这里
 Observable.Interval(TimeSpan.FromSeconds(0.5)) 
        .GroupBy(i => i % 3) 
        .Select(gr => gr.AsObservable()) 
        .Buffer(3)      
        .SelectMany(set => set.Zip()) 
        .Subscribe(v => Console.WriteLine(String.Join(",", v))); 

的一个问题是,你可能会失去初始值,而正在形成的群体,所以你可能要为并入通过重写该方法为IObservable<IList<T>> Zip<TKey, T>(this IGroupedObservable<TKey, T> observables)

+0

我无法找到此Zip过载。我正在使用包括Rx-Experimental在内的nuget运行Rx 2.0.20823。 –

+0

@JoãoBragança我已经添加了一个Zip方法,它为您提供了一套基本的想法。 – Asti

+0

Rx v2.0中存在以下过载: static IObservable > Zip (此IEnumerable > sources) –

在当前版本的Rx(我从NuGet获得)中,有一个Zip()版本,它需要一个观察值集合并返回一个可观察集合。有了这一点,你可以做这样的事情:

string[] requiredApprovals = …; 

var approvedByAll = requiredApprovals 
    .Select(required => approvals.Where(a => a.ApproverRole == required)) 
    .Zip(); 

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess()); 

但作为@Enigmativity指出,这只会工作,如果你可以肯定,每个人以同样的顺序批准,所有项目最终会被批准所有必需的角色。如果不是,你需要一些比Zip()更复杂的东西。

+0

请参阅我对Asti的回答的评论。 –

+0

我不知道为什么会这样。我也使用2.0.20823,它适用于我。虽然我没有Rx-Experimental。 – svick

+0

啊,我相信这是因为你的目标是4.5,而我仍然在4.0。你有没有认识到为什么它不包括在内? –