Rx:使用异步函数进行订阅并忽略错误
问题描述:
我想调用observable中每个项目的异步函数。如回答here,解决方案是使用SelectMany
。但是,如果异步方法抛出,订阅将终止。我有以下解决方案,这似乎工作:Rx:使用异步函数进行订阅并忽略错误
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.Catch(Observable.Empty<string>()));
是否有更多的习惯解决方案?
答
有一种标准的方式,以便能够观察到发生在您的通话RunAsync
例外,而这使用.Materialize()
。
的.Materialize()
方法变成一个IObservable<T>
序列为IObservable<Notification<T>>
序列在那里你可以推理对OnNext
,OnError
和OnCompleted
电话。
我写此查询:
var obs = Observable.Range(0, 10);
obs
.SelectMany(x =>
Observable
.FromAsync(() => RunAsync())
.Materialize())
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
.Subscribe(x => x.Dump());
有了这个支持代码:
private int counter = 0;
private Random rnd = new Random();
private System.Threading.Tasks.Task<string> RunAsync()
{
return System.Threading.Tasks.Task.Factory.StartNew(() =>
{
System.Threading.Interlocked.Increment(ref counter);
if (rnd.NextDouble() < 0.3)
{
throw new Exception(counter.ToString());
}
return counter.ToString();
});
}
当我运行它,我得到这样的输出:
2
4
5
1!
6
7
3!
10
8!
9
每条线结束于!
是致电RunAsync
,导致一个例外。
答
您也可以使用OnErrorResumeNext。
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.OnErrorResumeNext(Observable.Empty<string>()));
没错,但Catch让我有机会记录异常。 – 2015-04-03 10:22:14
这是真的,但也可以用Do()来实现。 无论如何,catch可能是更好的选择,因为OnErrorResumeNext在成功终止的情况下也会执行空序列。 – treze 2015-04-03 10:47:49