这可能使用Reactive Framework吗?
我有我的C#4.0应用程序中的对象列表。假设这个列表包含100个学生类的对象。 Reactive Framework有什么方法可以每次同时执行10个对象?这可能使用Reactive Framework吗?
每个学生对象都运行一些耗时约10到15秒的方法。因此,第一次通过时,从列表中取出前10个学生对象,并等待所有10个学生对象完成其工作,然后取下10个学生对象,依此类推直到完成列表中的全部项目?
- 我有一个
List<Student>
100计数。 - 首先从列表中取出10个项目,并且同时调用每个对象的长跑方法。
- 接收每个对象的返回值并更新UI [订阅部分]。
- 只有前10轮完成并释放所有内存时,才开始下一轮。
- 对列表中的所有项目重复相同的过程。
- 如何捕捉每个过程中的错误?
- 如何从内存中释放每个学生对象的资源和其他资源?
- 哪个是在Reactive Framework中完成所有这些事情的最佳方法?
我尝试....
var students = new List<Student>();
{....}
var cancel = students
.ToObservable(Scheduler.Default)
.Window(10)
.Merge(1)
.Subscribe(tenStudents =>
{
tenStudents.ObserveOn(Scheduler.Default)
.Do(x => DoSomeWork(x))
.ObserverOnDispatcher()
.Do(tenStudents => UpdateUI(tenStudents))
.Subscribe();
});
这个版本将始终在同一时间运行的10名学生。当学生完成时,另一个将开始。当每个学生完成时,你可以处理它有的任何错误,然后清理它(这将在下一个学生开始运行之前发生)。
students
.ToObservable()
.Select(student => Observable.Defer(() => Observable.Start(() =>
{
// do the work for this student, then return a Tuple of the student plus any error
try
{
student.DoWork();
return { Student = student, Error = (Exception)null };
}
catch (Exception e)
{
return { Student = student, Error = e };
}
})))
.Merge(10) // let 10 students be executing in parallel at all times
.Subscribe(studentResult =>
{
if (studentResult.Error != null)
{
// handle error
}
studentResult.Student.Dispose(); // if your Student is IDisposable and you need to free it up.
});
这不正是你问什么,因为它没有开始下一批次之前完成第一批10。这总是保持10并行运行。如果你真的想批10我会调整的代码。
感谢您的回复。我对RX很陌生并试图研究它。你们都很有帮助。在这个解决方案中,我发现只需要10个学生,然后10个学生完成后,就看不到任何代码,以便接下来10个学生(如批次),直到它覆盖100个学生。学生名单集合包含100名学生。由于所有100名学生并行运行可能会导致内存异常,我的计划是分批运行学生10.请帮助。 – user2017793 2013-02-28 00:26:37
'.Merge(10)'这样做。把它想象成一个酒吧里的保镖。一次只允许10名学生参加。只要其中一名学生完成并离开,那里只剩下9名学生,所以'Merge'会让另一名学生进入,直到所有100名学生都被处理完毕。 – Brandon 2013-02-28 03:31:35
感谢帮助。一次真正需要10名学生,下一次需要10个学生。当它需要一批10人时,Rx中是否有任何方法可以运行,这10名学生可以并行运行。上述解决方案正在等待每个批次的完成,然后只会调用订阅。在订阅中,我有另一个消息来更新wcf客户端中的UI。但等待每批的完整执行杀死时间。我正在寻找更快的RX方式。有没有什么办法可以批量并行运行成员? – user2017793 2013-02-28 10:46:34
这对我来说听起来非常像TPL的问题。你有一组已知的数据。您想分割一些繁重的处理并行运行,并且希望能够批处理负载。
我没有看到问题的任何地方是异步的源代码,是运动数据的源代码或需要被动的消费者。这是我建议您使用TPL的理由。
在另一个注释中,为什么要并行处理10个幻数?这是业务需求,还是潜在的优化性能的尝试?通常最好的做法是让TaskPool根据核心数量和当前负载计算出最适合客户端CPU的最佳实践。我想,随着设备及其CPU结构(单核,多核,多核,低功耗/禁用内核等)的巨大变化,这变得越来越重要。
这里有一种方法可以做到这在LinqPad(但要注意缺乏Rx的)
void Main()
{
var source = new List<Item>();
for (int i = 0; i < 100; i++){source.Add(new Item(i));}
//Put into batches of ten, but only then pass on the item, not the temporary tuple construct.
var batches = source.Select((item, idx) =>new {item, idx})
.GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item);
//Process one batch at a time (serially), but process the items of the batch in parallel (concurrently).
foreach (var batch in batches)
{
"Processing batch...".Dump();
var results = batch.AsParallel().Select (item => item.Process());
foreach (var result in results)
{
result.Dump();
}
"Processed batch.".Dump();
}
}
public class Item
{
private static readonly Random _rnd = new Random();
private readonly int _id;
public Item(int id)
{
_id = id;
}
public int Id { get {return _id;} }
public double Process()
{
var threadId = Thread.CurrentThread.ManagedThreadId;
string.Format("Processing on thread:{0}", threadId).Dump(Id);
var loopCount = _rnd.Next(10000,1000000);
Thread.SpinWait(loopCount);
return _rnd.NextDouble();
}
public override string ToString()
{
return string.Format("Item:{0}", _id);
}
}
如果你有一个数据在运动问题或反应我很想找出消费者问题,但只是“淡化”了问题,以便于解释。
谢谢阿隆。你能解释一下你的代码吗?非常感谢 – user2017793 2013-02-26 14:12:07
真的很简单。窗口(10)将工作转换为块10.合并(1)在单个线程上工作。将这10名学生转换成一个内部可观察的。呃,做一些工作吧。 ObserveOnDispatcher()返回到下一位的UI线程。 Do ...嗯...在UpdatingUI上工作。最后订阅内部可观察。冲洗并重复。 – Aron 2013-02-26 14:55:38
再次感谢阿隆。我的疑问是如何释放每个10个学生对象资源。你的解释非常有帮助,非常感谢。我担心内存不足问题。请帮助我。 – user2017793 2013-02-26 15:08:15