Couchbase异步批错误处理
我想,我得从观测量+ Couchbase异步API脑爆炸:) 可能有人帮助我,好吗? 已经和批量操作打了几天,并且仍然无法理解如何通过适当的错误处理来完成批量操作。Couchbase异步批错误处理
比方说,我要更新Couchbase散装一些文件。 如果我使用同步API,它看起来像:
List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
try {
try {
item.content().put("age", 42);
bucket.replace(item);
} catch (CASMismatchException e) {
// retry
bucket.get(item.id()).content().put("age", 42);
bucket.replace(item);
}
} catch (Exception e) {
// handle error which doesn't stop execution for other items
// for example, add item id to list of failed items in response
errorHandler.handleError(item.id(), e);
}
}
但这不是平行的,和文档说异步API更有效。 我不能理解的是,如何通过建立这样的观测量流量,我想:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
// what to do? return another observable which does retry logic above?
// how do I know what item has failed?
// I don't have ID of that item, nor I can extract it from passed Exception
// why onErrorResumeNext is getting called only once (if one item fails)
// and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)
任何帮助将非常感激! 感谢
你可以做这样的事情:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async()
.replace(item)
.retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException)
.onErrorReturn(e -> {
errorHandler.handleError(item.id(), e);
return null; //or item, if you need the item further down the stream
})
.subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async()
})
.subscribeOn(<something>) //with this scheduler the put() method will be executed
.subscribe();
的想法是通过flatMap()
每个项目处理到一个单独的可观察分开,因为每个重试逻辑是单个项目,而不是整个流。 重试运营谓语,让您重试次数和异常,所以你的情况,我们只与特定CASMismatchException
例外重试的第一次,然后错误,我们可以简单地做onErrorReturn
和办理其他错误操作,你甚至可以返回该项目如果你想继续处理它。
有一点需要注意的是调度,我不知道如果Couchbase在默认情况下io()
做async()
通话时操作。同时,考虑到该行:
item.content().put("age", 42);
将在最后subscribeOn(),因为它会在主流订阅调度来完成执行。
感谢您的帮助!我会尽量遵循你的建议。我描述的情况非常简单,但是关于通过Observable项而不是列出Observable捕获错误的想法是我真正想到的,现在它很可能会解决我的问题。 – blackdigger
我想你最好需要通过Observable.create建立可观察与尝试捕捉然后直接重试,如果重试工作发出这个项目,如果没有则发出错误。 –