如何创建一个Rx observable,当最后一个观察者取消订阅时停止发布事件?

如何创建一个Rx observable,当最后一个观察者取消订阅时停止发布事件?

问题描述:

我会创建一个observable(通过各种方法)并将其返回给感兴趣的各方,但是当他们完成了侦听时,我想拆除可观察对象,以免它继续占用资源。另一种方式是将其视为在酒吧子系统中创建主题。当没有人再订阅某个主题时,您不想再围绕主题及其过滤。如何创建一个Rx observable,当最后一个观察者取消订阅时停止发布事件?

的Rx已经有运营商,以满足您的需求 - 以及两人竟 - Publish & RefCount

下面是如何使用它们:

IObservable xs = ... 

var rxs = xs.Publish().RefCount(); 

var sub1 = rxs.Subscribe(x => { }); 
var sub2 = rxs.Subscribe(x => { }); 

//later 
sub1.Dispose(); 

//later 
sub2.Dispose(); 

//The underlying subscription to `xs` is now disposed of. 

简单。

如果我已经理解了你的问题,你想创建observable,这样当所有的订阅者都处理了它们的订阅,即没有更多的订阅者,那么你想执行一个清理功能,这将停止生产中的observable其他值。 如果这是你想要的,那么你可以这样做如下:

//Wrap a disposable 
public class WrapDisposable : IDisposable 
    { 
     IDisposable disp; 
     Action act; 
     public WrapDisposable(IDisposable _disp, Action _act) 
     { 
      disp = _disp; 
      act = _act; 
     } 
     void IDisposable.Dispose() 
     { 
      act(); 
      disp.Dispose(); 
     } 
    } 

    //Observable that we want to clean up after all subs are done 
    public static IObservable<long> GenerateObs(out Action cleanup) 
    { 
     cleanup =() => 
     { 
      Console.WriteLine("All subscribers are done. Do clean up"); 
     }; 
     return Observable.Interval(TimeSpan.FromSeconds(1)); 
    } 
    //Wrap the observable 
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone) 
    { 
     int count = 0; 
     return Observable.CreateWithDisposable<T>(ob => 
     { 
      var disp = obs.Subscribe(ob); 
      Interlocked.Increment(ref count); 
      return new WrapDisposable(disp,() => 
      { 
       if (Interlocked.Decrement(ref count) == 0) 
       { 
        onAllDone();             
       } 
      }); 
     }); 
    } 

//用例:

Action cleanup; 
var obs = GenerateObs(out cleanup); 
var newObs = WrapToClean(obs, cleanup); 
newObs.Take(6).Subscribe(Console.WriteLine); 
newObs.Take(5).Subscribe(Console.WriteLine);