SubscribeOn()在这里做什么?
我正在教我自己编写反应式编程,通过随机问题来解决问题,并且毫不羞耻地询问愚蠢的新手问题。在弄清楚线程调度如何工作的同时,我设法让自己陷入困境。虽然我很确定这段代码没有逻辑意义,但我也不明白发生了什么。指出这可能会帮助我。下面的代码:SubscribeOn()在这里做什么?
var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();
var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1),() => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2),() => emitter.OnCompleted());
var subscription = emitter.SubscribeOn(newThreadScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);
Console.WriteLine("DONE.");
Console.ReadLine();
我预期是什么,也许:
one
DONE.
Complete!
可能出现的交织,因为我不太清楚SubscribeOn()会做。我得到的是:
DONE.
Complete!
究竟发生了什么?为什么该项目在完成之前没有生成?在这种情况下ObserveOn()的工作方式与我的预期一样,我理解为什么:它在其他某个线程上运行代理,并且它们可以与“完成”交错。那么SubscribeOn()究竟做了什么?
你在这里只是一个竞赛条件。
如果我们回来翻录的所有代码,只是
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();
var subscription = emitter
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
Console.WriteLine("DONE.");
Console.ReadLine();
我们得到了相同的结果。 通过使用Subject<T>
,您将不会获得任何缓存行为,但OnCompleted
通知的例外情况除外。
SubscribeOn
运营商将安排在提供的IScheduler
实例上完成任何订阅工作。 在订阅Subject<T>
的情况下,几乎没有工作要做。 这几乎和将回调注册到回调列表一样简单。
安排到NewThreadScheduler
的工作将创建一个新线程,然后创建一个内部事件循环来处理计划工作。 这很快,但确实需要创建一个新线程,一个EventloopScheduler并执行上下文切换到新线程。
在您的示例中,您安排了上的OnNext
和OnCompleted
通知。 然后你SubscribeOn
与NewThreadScheduler
。 接下来,您开始处理TestScheduler
实例的所有计划工作。 这些虚拟处理的计划项目,只是迭代计划的项目,执行委托和推进虚拟时钟。 这非常快。
更具体,下面的代码是类似于你所写的内容
var newThreadScheduler = new NewThreadScheduler();
var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));
foreach (var callback in callbacks)
{
callback("one");
}
Console.WriteLine("Done");
在这里,我们只是有回调操作的列表(打电话给他们的用户或观察员)。 然后,我们在一个新线程上异步计划添加其中一个回调。 然后立即迭代回调并将字符串“one”发送给它们中的每一个。 结果是
Done
的NewThreadScheduler
只是没有获得足够的时间来启动一个新的线程,调度动作,然后执行该操作,之前主线程可以通过收集迭代。
所以有几条指导原则,我认为你没有遵循: 1)避免主题;-) 2)不要混合使用线程和单元测试。我假设TestScheduler
的存在是因为你正在测试这个。但是,您可以使用两个TestScheduler
例如背景和前景实例。
为了提供更多帮助,我将提供建议您仅从测试中删除第二个调度程序的积极指导。 在您的SubscribeOn
运营商中使用TestScheduler
实例。
接下来,我建议用TestScheduler
的可观测序列工厂方法(即CreateColdObservable
)替换主题+调度的使用。 最后,我不知道是否前进到1秒的speicifc时间获得任何东西,而不仅仅是使用Start
方法。 我认为这会减少噪音和使用魔法值1s。
var testScheduler = new TestScheduler();
var source = testScheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));
var subscription = source.SubscribeOn(testScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.Start();
Console.WriteLine("DONE.");
Console.ReadLine();
现在唯一的问题是,SubscribeOn
调用是相当多余的。
FYI:代码为NewThreadScheduler
- https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
感谢这个非常详细的答案,这不是真正的生产代码,只是试图弄清楚事情是如何工作的,所以这使得它有点奇怪!这听起来像将TestScheduler与'真正'的调度程序混合会导致更多问题,所以我需要调整我的实验方式。 – OwenP
是同意的。要么使用testscheduler/historicalscheduler“虚拟化”时间,要么使用真正的时间 –
我什么都不知道的反应,而是看你的代码,你'newThreadScheduler'认购,但所有的工作和日程安排都对'testScheduler'触发。 – TyCobb