阿卡流+阿卡的Http - 获取上的错误
问题描述:
请求我有以下流是非常有效:阿卡流+阿卡的Http - 获取上的错误
source
.map(x => HttpRequest(uri = x.rawRequest))
.via(Http().outgoingConnection(host, port))
.to(Sink.actorRef(myActor, IsDone))
.run()
和一个简单的演员流完成时处理响应状态,并最终消息:
/**
* A simple actor to count how many rows have been processed
* in the complete process given a http status
*
* It also finish the main thread upon a message of type [[IsDone]] is received
*/
class MyActor extends Actor with ActorLogging {
var totalProcessed = 0
def receive = LoggingReceive {
case response: HttpResponse =>
if(response.status.isSuccess()) {
totalProcessed = totalProcessed + 1
} else if(response.status.isFailure()) {
log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
} else {
log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
}
case IsDone =>
println(s"total processed: $totalProcessed")
sys.exit()
}
}
case object IsDone
我不知道这是否是处理事件并处理响应状态的最佳方法,但目前为止还是有效的。
问题是如何将原始请求传递给演员,以我能够知道什么请求导致了特定错误的方式。
我的演员可以期待以下代替:
case (request: String, response: HttpResponse) =>
但如何传递,我有我的管道开始的信息?
我想对map
这样的:
source
.map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))
但我对如何激发中的HTTP流不知道。
有什么建议吗?
答
随着@cmbaxter的帮助下,我可以使用下面的代码解决我的问题:
val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)
source
.map(url => HttpRequest(uri = url) -> url)
.via(poolClientFlow)
.to(Sink.actorRef(myActor, IsDone))
.run()
现在我的演员是能够接收这样的:
case (respTry: Try[HttpResponse], request: String) =>
使用主机连接池,而不是尝试每个请求明确打开传出连接。该模型需要每个请求的任意标识符,然后返回响应,以便您可以正确地关联请求和响应 – cmbaxter
Hi @cmbaxter您是否打算使用此示例? http://doc.akka.io/docs/akka/2.4.4/scala/http/client-side/host-level.html#Example但用String代替? –