RX for .NET - 从Disposable.Create中调用observer.OnCompleted时,什么也没有发生
我在C#中使用.NET的RX库。任何人都可以向我解释为什么'observer.OnCompleted()方法什么也不做下面的代码:RX for .NET - 从Disposable.Create中调用observer.OnCompleted时,什么也没有发生
var observableStream = Observable.Create<CustomMessage>(
(observer) =>
{
CustomMessage cm = new CustomMessage();
CustomMessage.Subscribe(observer.OnNext);
return Disposable.Create(
() =>
{
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
observer.OnCompleted(); //***Nothing happens here***
}
);
});
//IObserver.OnException()
public override void OnException(Exception e)
{
Console.WriteLine("Exception occurred - " + e.Message);
}
//IObserver.OnComplete()
public override void OnUnsubscribe()
{
Console.WriteLine("Unsubscribed...");
}
//IObserver.OnNext()
public override void HandleNextMsg(IRVMessage msg)
{
Console.WriteLine("Instance received a message");
}
IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);
//At some later point....
myDisposable.Dispose();
代码的目的是使订阅CustomMessages流。它在设置订阅时使用我的CustomMessage类型注册observer.OnNext()方法。然后它在注销订阅时取消注册observer.OnNext()。所有这些都能正常工作。每当收到CustomMessage时,我的'HandleNextMsg()'方法都会被调用。
在稍后一点,当我想结束我的订阅我称之为“的Dispose()”和下面两行执行成功:
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
然后我收到没有更多CustomMessages。但是下面一行,虽然执行,不执行任何操作:
observer.OnCompleted();
我希望它来调用行:
Console.WriteLine("Unsubscribed...");
在某一点上的观察者和“OnUnsubscribe”方法之间的连接丢失,我想了解到底发生了什么。 'observer.OnNext()'如何被成功注销,但'observer.OnCompleted()'什么也不做?
有人向我指出,仅仅因为我处理流并不意味着我应该调用'OnCompleted()',但我仍然想明白为什么它不起作用。
您看到的问题是由于您通过的功能Observable.Create
和您订阅IObservable
所产生的任何观察者而造成的。基本上,流程如下:
- Observable.Create返回AnonymousObservable。
- AnonymousObservable将观察者封装在Subscribe中的AutoDetachObserver中。
- AnonymousObservable从Subscribe中返回AutoDetachObserver(实现IDisposable)。
- AutoDetachObserver.Dispose设置其停止标志和然后配置对象您原来的订阅功能返回。此标志导致观察者忽略对OnError和OnCompleted的未来调用,从而导致不会调用包装的观察者方法。
这个答案是基于RX的v1.x,但我预计这在v2.0中并没有改变。
如果您有一些代码需要运行,无论订阅结束如何(OnError,OnCompleted或Dispose),我会建议Observable.Finally。
OnCompleted()是为了通知订阅者上游序列(您的案例中的CustomMessage)已经结束。这并不意味着订户要求取消订阅成功的确认,这似乎是您如何使用它。 OnCompleted()是关于序列的通知,对于所有订阅者,不是针对该序列的单个订阅。
换句话说,您不应该在Dispose中调用它。毕竟,用户正在自行解散,为什么需要通知?
至于没有任何事情发生的实际技术原因,我猜测回调(通过设计)不会在您已经处置时进行。只是一个理论,这不是很相关。
很好的解释谢谢。正如我所提到的,我后来猜测,调用OnCompleted是不正确的,但你已经给出了一个很好的解释,为什么这是这种情况,谢谢。 – JMc
非常好的解释谢谢! – JMc
也适用于Rx Java! – pommedeterresautee