得到任何非错误元素的递延OCaml中/异步

问题描述:

假设我有一个功能,如:得到任何非错误元素的递延OCaml中/异步

query_server : Server.t -> string Or_error.t Deferred.t 

然后我产生递延查询列表:

let queries : string Or_error.t Deferred.t list = List.map servers ~f:query_server 

如何得到第一个查询不失败的结果(否则有一些错误)。基本上,我想要一个功能,如:

any_non_error : 'a Or_error.t Deferred.t list -> 'a Or_error.t 

此外,我不知道如何以某种方式聚合错误。也许我的功能需要一个额外的参数,如Error.t -> Error.t -> Error.t还是有标准的方法来组合错误?

一个简单的方法是使用Deferred.List,其中包含列表操作提升到异步monad中,基本上是Kleisli类中的容器接口。我们将尝试在每个服务器,以便直到第一个是准备好,例如,

let first_non_error = 
    Deferred.List.find ~f:(fun s -> query_server s >>| Result.is_ok) 

当然,这不是any_non_error,作为处理是连续的。此外,我们正在失去错误信息(尽管后者很容易修复)。

所以为了使其平行,我们将采用以下策略。我们将有两个延迟计算,第一个将并行运行所有查询,并等待所有查询都准备就绪,第二个将在收到Ok结果后立即确定。如果第一个发生在最后一个之前,那么这意味着所有服务器都失败了。所以我们试试:

let query_servers servers = 
    let a_success,got_success = Pipe.create() in 
    let all_errors = Deferred.List.map ~how:`Parallel servers ~f:(fun s -> 
     query_server s >>| function 
     | Error err as e -> e 
     | Ok x as ok -> Pipe.write_without_pushback x; ok) in 
    Deferred.any [ 
     Deferred.any all_errors; 
     Pipe.read a_success >>= function 
     | `Ok x -> Ok x 
     | `Eof -> assert false 
    ]