RX for .NET - 从Disposable.Create中调用observer.OnCompleted时,什么也没有发生

问题描述:

我在C#中使用.NET的RX库。任何人都可以向我解释为什么'observer.OnCompleted()方法什么也不做下面的代码:RX for .NET - 从Disposable.Create中调用observer.OnCompleted时,什么也没有发生

var observableStream = Observable.Create<CustomMessage>(
     (observer) => 
      { 
       CustomMessage cm = new CustomMessage(); 
       CustomMessage.Subscribe(observer.OnNext); 

       return Disposable.Create(
       () => 
        { 
         Console.WriteLine("Disposing..."); 
         CustomMessage.Unsubscribe(observer.OnNext);       
         observer.OnCompleted();    //***Nothing happens here*** 
        } 
       ); 
     }); 

    //IObserver.OnException() 
    public override void OnException(Exception e) 
    { 
     Console.WriteLine("Exception occurred - " + e.Message); 
    } 

    //IObserver.OnComplete() 
    public override void OnUnsubscribe() 
    { 
     Console.WriteLine("Unsubscribed...");    
    } 

    //IObserver.OnNext() 
    public override void HandleNextMsg(IRVMessage msg) 
    { 
     Console.WriteLine("Instance received a message"); 
    } 

IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe); 

//At some later point.... 
myDisposable.Dispose(); 

代码的目的是使订阅CustomMessages流。它在设置订阅时使用我的CustomMessage类型注册observer.OnNext()方法。然后它在注销订阅时取消注册observer.OnNext()。所有这些都能正常工作。每当收到CustomMessage时,我的'HandleNextMsg()'方法都会被调用。

在稍后一点,当我想结束我的订阅我称之为“的Dispose()”和下面两行执行成功:

Console.WriteLine("Disposing..."); 
CustomMessage.Unsubscribe(observer.OnNext); 

然后我收到没有更多CustomMessages。但是下面一行,虽然执行,不执行任何操作:

observer.OnCompleted(); 

我希望它来调用行:

Console.WriteLine("Unsubscribed..."); 

在某一点上的观察者和“OnUnsubscribe”方法之间的连接丢失,我想了解到底发生了什么。 'observer.OnNext()'如何被成功注销,但'observer.OnCompleted()'什么也不做?

有人向我指出,仅仅因为我处理流并不意味着我应该调用'OnCompleted()',但我仍然想明白为什么它不起作用。

您看到的问题是由于您通过的功能Observable.Create和您订阅IObservable所产生的任何观察者而造成的。基本上,流程如下:

  • Observable.Create返回AnonymousObservable。
  • AnonymousObservable将观察者封装在Subscribe中的AutoDetachObserver中。
  • AnonymousObservable从Subscribe中返回AutoDetachObserver(实现IDisposable)。
  • AutoDetachObserver.Dispose设置其停止标志和然后配置对象您原来的订阅功能返回。此标志导致观察者忽略对OnError和OnCompleted的未来调用,从而导致不会调用包装的观察者方法。

这个答案是基于RX的v1.x,但我预计这在v2.0中并没有改变。

如果您有一些代码需要运行,无论订阅结束如何(OnError,OnCompleted或Dispose),我会建议Observable.Finally

+0

非常好的解释谢谢! – JMc

+0

也适用于Rx Java! – pommedeterresautee

OnCompleted()是为了通知订阅者上游序列(您的案例中的CustomMessage)已经结束。这并不意味着订户要求取消订阅成功的确认,这似乎是您如何使用它。 OnCompleted()是关于序列的通知,对于所有订阅者,不是针对该序列的单个订阅。

换句话说,您不应该在Dispose中调用它。毕竟,用户正在自行解散,为什么需要通知?

至于没有任何事情发生的实际技术原因,我猜测回调(通过设计)不会在您已经处置时进行。只是一个理论,这不是很相关。

+1

很好的解释谢谢。正如我所提到的,我后来猜测,调用OnCompleted是不正确的,但你已经给出了一个很好的解释,为什么这是这种情况,谢谢。 – JMc