阿卡流+阿卡的Http - 获取上的错误

问题描述:

请求我有以下流是非常有效:阿卡流+阿卡的Http - 获取上的错误

source 
    .map(x => HttpRequest(uri = x.rawRequest)) 
    .via(Http().outgoingConnection(host, port)) 
    .to(Sink.actorRef(myActor, IsDone)) 
    .run() 

和一个简单的演员流完成时处理响应状态,并最终消息:

/** 
    * A simple actor to count how many rows have been processed 
    * in the complete process given a http status 
    * 
    * It also finish the main thread upon a message of type [[IsDone]] is received 
    */ 
class MyActor extends Actor with ActorLogging { 

    var totalProcessed = 0 

    def receive = LoggingReceive { 

    case response: HttpResponse => 

     if(response.status.isSuccess()) { 
     totalProcessed = totalProcessed + 1 
     } else if(response.status.isFailure()) { 
     log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}") 
     } else { 
     log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}") 
     } 

    case IsDone => 
     println(s"total processed: $totalProcessed") 
     sys.exit() 
    } 
} 

case object IsDone 

我不知道这是否是处理事件并处理响应状态的最佳方法,但目前为止还是有效的。

问题是如何将原始请求传递给演员,以我能够知道什么请求导致了特定错误的方式。

我的演员可以期待以下代替:

case (request: String, response: HttpResponse) => 

但如何传递,我有我的管道开始的信息?

我想对map这样的:

source 
    .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest)) 

但我对如何激发中的HTTP流不知道。

有什么建议吗?

+2

使用主机连接池,而不是尝试每个请求明确打开传出连接。该模型需要每个请求的任意标识符,然后返回响应,以便您可以正确地关联请求和响应 – cmbaxter

+0

Hi @cmbaxter您是否打算使用此示例? http://doc.akka.io/docs/akka/2.4.4/scala/http/client-side/host-level.html#Example但用String代替? –

随着@cmbaxter的帮助下,我可以使用下面的代码解决我的问题:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port) 

source 
    .map(url => HttpRequest(uri = url) -> url) 
    .via(poolClientFlow) 
    .to(Sink.actorRef(myActor, IsDone)) 
    .run() 

现在我的演员是能够接收这样的:

case (respTry: Try[HttpResponse], request: String) =>