RxJava错误处理热点观察到
我是很新,RxJava和有图案等 我使用下面的代码创建一个可观察到的一些问题:RxJava错误处理热点观察到
public Observable<Volume> getVolumeObservable(Epic epic) {
return Observable.create(event -> {
try {
listeners.add(streamingAPI.subscribeForChartCandles(epic.getName(), MINUTE, new HandyTableListenerAdapter() {
@Override
public void onUpdate(int i, String s, UpdateInfo updateInfo) {
if (updateInfo.getNewValue(CONS_END).equals(ONE)) {
event.onNext(new Volume(Integer.parseInt(updateInfo.getNewValue(LAST_TRADED_VOLUME))));
}
}
}));
} catch (Exception e) {
LOG.error("Error from volume observable", e);
}
});
}
一切工作正常,但我有一些关于错误处理的问题。 如果我理解正确,这将被视为“热点观察”,即无论订阅与否,事件都会发生(onUpdate是由我无法控制的远程服务器使用的回调)。
我选择不要在这里调用onError,因为我不希望observable在单个异常情况下停止发射事件。有没有更好的模式可供使用? .retry()出现在脑海中,但我不确定这是否适合热门的可观察性?
另外,在创建订阅时,但在第一个onNext被调用之前,observable如何表示?它只是一个Observable.empty()
1)你的可观察性不是热。区分因素是多个订户是否共享相同的订阅。 Observable.create()
为每个用户调用订阅功能,即它是冷。
虽然很容易使它热。只需添加share()
运营商。它将订阅第一位订阅者并取消订阅最后一位订阅者。不要忘记实现退订功能,像这样的东西:
event.setCancellable(() -> listeners.remove(...));
2)错误可能是可恢复和无法恢复。
如果你认为一个错误是可以自我修复的(不需要你的行为),你不应该打电话onError
,因为这会杀死你的可观察的(不会发生进一步的事件)。您可能会通过发出特殊的Volume
消息并附上错误详细信息来通知您的订户。
如果错误是致命的,例如你没有添加监听器,所以可能没有更多的消息,你不应该默默地忽略这个。发射onError
因为你的可观察性无论如何都不起作用。
如果错误需要您采取措施,通常是重试或超时重试,您可以添加retryXxx()
运算符中的一个。在create()
之后执行此操作,但在之前执行share()
。
3)Observable
是具有subscribe()
方法的对象。它是如何表示取决于你创建它的方法。例如,请参阅源代码create()
。
你认为错误来自哪里?从'listeners.add()'或者'onUpdate()'?你能给出一个错误情况的例子,你想要通知订阅者。 –
我猜你有点误解热/冷Observable。这并不热,每个用户都有自己的监听器来发送事件。即使你没有注销你的听众处置。由于Observable.create机制,observable在处置后不会发出事件。 –
可能是listeners.add()和onUpdate()。不幸的是,我使用的API是非常不明确的。 – Daniel