阿卡流+阿卡-HTTP生命周期
TLDR:是更好兑现每个请求一个流(即使用短寿命流)或使用跨请求单个流物化,当我有传出http请求作为一部分流的?阿卡流+阿卡-HTTP生命周期
详细信息:我有一个典型的服务,需要一个HTTP请求时,它驱散几个第三方服务的下游(不是由我控制),然后送回前汇总的结果。我正在使用akka-http进行客户端实施并喷洒服务器(遗留下来,随着时间的推移将会转移到akka-http)。示意性地:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
这可以通过物化每个请求的数据流或物化(部分)流一次,跨请求共享来实现任一。
实现每个请求会导致实现开销并且不清楚如何利用连接池。这个问题被描述为here(很多实现可能会耗尽池)。我可以将一个池包装在一个像here这样长时间运行的http流中,并将其包装在一个mapAsync
“上游”中,但错误处理策略对我来说并不明确。当单个请求失败并且流被终止时,它是否也会取下池?更多的是,我似乎需要协调请求和响应,因为它们没有按顺序返回。跨请求
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
共享流具有错误处理类似的问题 - 似乎有一些可以降低该流在飞行中的所有请求的故障模式。该代码将类似于host level API,但队列面向整个流。
哪种方式在这种情况下更好?
我曾尝试实现这两个解决方案,但也有许多设计选择在实施的每一个阶段,所以它似乎很容易搞砸了即使是在“正确”的道路。
尽管我认为它可以忽略不计,并且它与akka-http服务器的运行方式相同。
一般来说,是更好使用一个连接Flow
和调度所有的请求通过单流。主要原因是由于新实现可能实际上每次都会形成新的Connection
(取决于连接池设置)。
你是正确的,这会导致一些并发症:
订购:通过提供随机UUID
作为元组 要传递到连接第二流量值,你可以消灭你的能力将请求与响应相关联。在数组中的额外T
值可以被用作“相关ID”知道哪些HttpResponse
你是从流量获取。在您的特定例如,你可以使用来自Range
您创建的初始Int
:
val responseSource : Source[(Try[HttpResponse], Int), _] =
Source
.fromIterator(() => Iterator range (0,5))
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)
现在每个响应带有你可以用它来处理响应原来的int值。
错误处理:您在声明“单个请求失败并且流已终止”时不正确。单个请求失败不一定会导致流失败。相反,您只需从连接流中获取(Failure(exception), Int)
值。您现在知道哪个Int导致了失败,并且您有流程中的异常。
谢谢拉蒙!我的问题是我没有一个'Source',每个输入流都是http服务的请求,所以最多我有'Source.single'。在这种情况下,我将如何获得单一流量? – Tim
此外,我将如何处理单个流中的请求超时?我能够找到的唯一超时是'completionTimeout'阶段,但它失败了一个流,不会向下游传播错误。 – Tim