使用Rx处理可观察序列中的错误

问题描述:

如果发生错误,是否有方法让可观察序列继续执行,以及序列中的下一个元素? 从this post看起来您需要在Catch()中指定一个新的可观察序列以恢复执行,但是如果您需要继续使用序列中的下一个元素继续处理,那该怎么办?有没有办法做到这一点?使用Rx处理可观察序列中的错误

更新: 该场景如下: 我有一堆我需要处理的元素。处理由一堆步骤组成。我有 将这些步骤分解成我想编写的任务。 我遵循ToObservable()的指导原则here 将任务转换为构成的可观察对象。 所以基本上我在做somethng像这样 -

foreach(element in collection) 
{ 
    var result = from aResult in DoAAsync(element).ToObservable() 
     from bResult in DoBAsync(aResult).ToObservable() 
     from cResult in DoCAsync(bResult).ToObservable() 
     select cResult; 
    result.subscribe(register on next and error handlers here) 
} 

我也可以是这样的:

var result = 
     from element in collection.ToObservable() 
     from aResult in DoAAsync(element).ToObservable() 
     from bResult in DoBAsync(aResult).ToObservable() 
     from cResult in DoCAsync(bResult).ToObservable() 
     select cResult; 

什么是这里继续处理其他的元素,即使我们说的处理的最佳方式 其中一个元素引发异常。我希望能够记录错误并理想地继续前进。

IObservableIObserver之间的合同是OnNext*(OnCompelted|OnError)?,即使不是来源,所有运营商都支持该合同。

你唯一的选择就是重新订阅使用Retry源,但如果源返回IObservable实例每一个描述你将不会看到任何新的价值。

您能否提供有关您的方案的更多信息?也许有另一种方式来看待它。

编辑:基于更新后的反馈,这听起来像你只需要Catch

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>()) 
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>()) 
    select cResult; 

这将替换与Empty一个错误,就不会触发下一个序列(因为它使用SelectMany

+0

我已更新该帖子以包含我正在尝试完成的场景 – 2011-05-20 21:08:25

James James &理查德提出了一些很好的观点,但我认为他们没有给你最好的方法来解决你的问题

James建议使用.Catch(Observable.Never<Unit>())。当他说“将...允许流继续”时,他错了,因为一旦你遇到异常,流必须结束 - 这是理查德在提到观察者和观察者之间的合同时指出的。

此外,以这种方式使用Never将导致您的观测值永远不会完成。

简而言之,.Catch(Observable.Empty<Unit>())是将序列从一个以错误结尾的序列更改为以完成结束的序列的正确方法。

您已经达到了使用SelectMany来处理源集合的每个值的正确想法,以便您可以捕获每个异常,但是您留下了一些问题。

您正在使用任务(TPL)只是将函数调用转换为可观察值。这会强制您的observable使用任务池线程,这意味着SelectMany语句可能会以非确定性顺序生成值。

此外,您还隐藏实际调用来处理数据,使重构和维护变得更困难。

我认为你最好创建一个允许跳过异常的扩展方法。那就是:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector) 
{ 
    return 
     source 
      .Select(t => 
       Observable.Start(() => selector(t)).Catch(Observable.Empty<R>())) 
      .Merge(); 
} 

用这种方法你现在可以简单地这样做:

var result = 
    collection.ToObservable() 
     .SelectAndSkipOnException(t => 
     { 
      var a = DoA(t); 
      var b = DoB(a); 
      var c = DoC(b); 
      return c; 
     }); 

这段代码要简单得多,但它隐藏了异常(S)。如果你想继续让异常继续下去,那么你需要做一些额外的功能。在Materialize扩展方法中增加一些重载可以保持错误。

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector) 
{ 
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector); 
} 

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector) 
{ 
    Func<Notification<T>, Notification<R>> f = nt => 
    { 
     if (nt.Kind == NotificationKind.OnNext) 
     { 
      try 
      { 
       return Notification.CreateOnNext<R>(selector(nt.Value)); 
      } 
      catch (Exception ex) 
      { 
       ex.Data["Value"] = nt.Value; 
       ex.Data["Selector"] = selector; 
       return Notification.CreateOnError<R>(ex); 
      } 
     } 
     else 
     { 
      if (nt.Kind == NotificationKind.OnError) 
      { 
       return Notification.CreateOnError<R>(nt.Exception); 
      } 
      else 
      { 
       return Notification.CreateOnCompleted<R>(); 
      } 
     } 
    }; 
    return source.Select(nt => f(nt)); 
} 

这些方法允许你这样写:

var result = 
    collection 
     .ToObservable() 
     .Materialize(t => 
     { 
      var a = DoA(t); 
      var b = DoB(a); 
      var c = DoC(b); 
      return c; 
     }) 
     .Do(nt => 
     { 
      if (nt.Kind == NotificationKind.OnError) 
      { 
       /* Process the error in `nt.Exception` */ 
      } 
     }) 
     .Where(nt => nt.Kind != NotificationKind.OnError) 
     .Dematerialize(); 

你甚至可以链接这些Materialize方法和使用ex.Data["Value"] & ex.Data["Selector"]得到抛出错误出来的价值和选择功能。

我希望这会有所帮助。

+0

我在尝试使用observables观察值时遇到类似问题。当一个内部observable抛出一个OnError时,外部可观察的视图会随之移动到OnError - 从而导致一切关闭。我已经尝试了捕获异常并抛出OnCompleted的解决方案,但是这产生了与OnCompleted和OnError完全相同的行为,导致订阅关闭 – letstango 2014-04-15 23:18:05