如何创建一个Rx observable,当最后一个观察者取消订阅时停止发布事件?
问题描述:
我会创建一个observable(通过各种方法)并将其返回给感兴趣的各方,但是当他们完成了侦听时,我想拆除可观察对象,以免它继续占用资源。另一种方式是将其视为在酒吧子系统中创建主题。当没有人再订阅某个主题时,您不想再围绕主题及其过滤。如何创建一个Rx observable,当最后一个观察者取消订阅时停止发布事件?
答
的Rx已经有运营商,以满足您的需求 - 以及两人竟 - Publish
& RefCount
。
下面是如何使用它们:
IObservable xs = ...
var rxs = xs.Publish().RefCount();
var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });
//later
sub1.Dispose();
//later
sub2.Dispose();
//The underlying subscription to `xs` is now disposed of.
简单。
答
如果我已经理解了你的问题,你想创建observable,这样当所有的订阅者都处理了它们的订阅,即没有更多的订阅者,那么你想执行一个清理功能,这将停止生产中的observable其他值。 如果这是你想要的,那么你可以这样做如下:
//Wrap a disposable
public class WrapDisposable : IDisposable
{
IDisposable disp;
Action act;
public WrapDisposable(IDisposable _disp, Action _act)
{
disp = _disp;
act = _act;
}
void IDisposable.Dispose()
{
act();
disp.Dispose();
}
}
//Observable that we want to clean up after all subs are done
public static IObservable<long> GenerateObs(out Action cleanup)
{
cleanup =() =>
{
Console.WriteLine("All subscribers are done. Do clean up");
};
return Observable.Interval(TimeSpan.FromSeconds(1));
}
//Wrap the observable
public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
{
int count = 0;
return Observable.CreateWithDisposable<T>(ob =>
{
var disp = obs.Subscribe(ob);
Interlocked.Increment(ref count);
return new WrapDisposable(disp,() =>
{
if (Interlocked.Decrement(ref count) == 0)
{
onAllDone();
}
});
});
}
//用例:
Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);