Akka http连接池

问题描述:

我正在尝试为我们的akka​​ http应用程序使用客户端连接池。但是,一旦达到最大连接数,请求似乎会挂起。我已经凝结的问题到以下几点:Akka http连接池

import java.lang.Thread.UncaughtExceptionHandler 
import java.net.ServerSocket 
import akka.actor.ActorSystem 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} 
import akka.http.scaladsl.client.RequestBuilding._ 

import scala.annotation.tailrec 
import scala.util.{Success, Try} 

object AkkaProblem extends App { 
    val server = new ServerSocket(0) 
    val serverPort = server.getLocalPort 

    object responder extends Runnable with UncaughtExceptionHandler { 
     val cr = '\r' 
     val httpResponse = 
    s"""HTTP/1.1 404 Not Found$cr 
     |Content-Type: application/json;charset=UTF-8$cr 
     |Date: Mon, 26 Sep 2016 06:30:13 GMT$cr 
     |Connection: keep-alive$cr 
     |Transfer-Encoding: chunked$cr 
     |$cr 
     |12$cr 
     |{"Hello": "World"}$cr 
     |0$cr 
     |$cr 
     |""".stripMargin 

     override final def run(): Unit = { 
      val socket = server.accept() 
      @tailrec def sendResponse(): Unit = { 
       socket.getOutputStream.write(httpResponse.getBytes) 
       sendResponse() 
      } 

      sendResponse() 
     } 

     override def uncaughtException(t: Thread, e: Throwable): Unit =() 
    } 

    for (nr <- 1 to 4) { 
     val thread = new Thread(responder, s"response-thread-$nr") 
     thread.setUncaughtExceptionHandler(responder) 
     thread.setDaemon(true) 
     thread.start() 
    } 


    implicit val system = ActorSystem("main") 
    import system.dispatcher 
    implicit val mat = ActorMaterializer() 

    val serverUri = Uri(s"http://localhost:$serverPort") 
    val request = Get(serverUri) 

    val poolFlow: Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] = 
     Http().newHostConnectionPool(serverUri.authority.host.address, serverUri.authority.port, ConnectionPoolSettings("max-connections: 4")) 

    val source = Source.repeat(request).take(1000).map((_,())) 

    val runRequest = source.viaMat(poolFlow)(Keep.right).toMat(Sink.seq)(Keep.both) 
    val (connectionPool, response) = runRequest.run() 

    response.map(_.map(_._1)).andThen { 
    case Success(responses) => 
     val byResultType = responses.groupBy(_.isSuccess).mapValues(_.size) 

     println(s"Received response. Got ${byResultType.get(true)} successes, ${byResultType.get(false)} errors") 
     connectionPool.shutdown() andThen { 
     case done => 
      println("Connection pool shut down") 
      system.terminate() 
     } 
    } 
} 

我希望该计划能够相对迅速报告1000次成功,并关机。相反,它无限期地挂起。当请求数量降低到匹配允许的连接数时,问题就自行解决。

作为一种解决方法,我们可以在每个连接上使用一个自己的池,但这样做会破坏拥有池的目的。

堆栈转储显示没有死锁或以其他方式明显misbehaviours:在HttpResponse从请求的

 
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1000m; support was removed in 8.0 
2016-09-26 13:24:18 
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode): 

"Attach Listener" #25 daemon prio=9 os_prio=31 tid=0x00007f86bf001000 nid=0x3307 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"main-akka.actor.default-dispatcher-10" #24 prio=5 os_prio=31 tid=0x00007f86bbb4a000 nid=0x6b03 waiting on condition [0x000000011d717000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-9" #23 prio=5 os_prio=31 tid=0x00007f86bc402800 nid=0x6903 waiting on condition [0x000000011d614000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-8" #22 prio=5 os_prio=31 tid=0x00007f86bbb49800 nid=0x6703 waiting on condition [0x000000011d511000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-7" #21 prio=5 os_prio=31 tid=0x00007f86bb292000 nid=0x6503 waiting on condition [0x000000011d40e000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.io.pinned-dispatcher-6" #20 prio=5 os_prio=31 tid=0x00007f86bcbcd000 nid=0x6407 runnable [0x000000011d10b000] 
    java.lang.Thread.State: RUNNABLE 
     at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) 
     at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) 
     at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) 
     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) 
     - locked (a sun.nio.ch.Util$2) 
     - locked (a java.util.Collections$UnmodifiableSet) 
     - locked (a sun.nio.ch.KQueueSelectorImpl) 
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) 
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101) 
     at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:115) 
     at akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:219) 
     at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:148) 
     at akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:67) 
     at akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:71) 
     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

"DestroyJavaVM" #19 prio=5 os_prio=31 tid=0x00007f86bcba2800 nid=0xd03 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"main-akka.actor.default-dispatcher-5" #18 prio=5 os_prio=31 tid=0x00007f86bc252800 nid=0x5d03 waiting on condition [0x000000011c488000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-4" #17 prio=5 os_prio=31 tid=0x00007f86bc823800 nid=0x5b03 waiting on condition [0x000000011c185000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-3" #16 prio=5 os_prio=31 tid=0x00007f86bba26000 nid=0x5903 waiting on condition [0x000000011c082000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-2" #15 prio=5 os_prio=31 tid=0x00007f86bc256000 nid=0x5703 waiting on condition [0x000000011bf7f000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-scheduler-1" #14 prio=5 os_prio=31 tid=0x00007f86bc248800 nid=0x5503 waiting on condition [0x000000011b8d2000] 
    java.lang.Thread.State: TIMED_WAITING (sleeping) 
     at java.lang.Thread.sleep(Native Method) 
     at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:87) 
     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:268) 
     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-4" #13 daemon prio=5 os_prio=31 tid=0x00007f86bc195000 nid=0x5303 runnable [0x000000011b7cf000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-3" #12 daemon prio=5 os_prio=31 tid=0x00007f86bb0fa800 nid=0x5103 runnable [0x000000011b6cc000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-2" #11 daemon prio=5 os_prio=31 tid=0x00007f86bb9ca000 nid=0x4f03 runnable [0x000000011b5c9000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-1" #10 daemon prio=5 os_prio=31 tid=0x00007f86bb9c1000 nid=0x4d03 runnable [0x000000011b4c6000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"Monitor Ctrl-Break" #9 daemon prio=5 os_prio=31 tid=0x00007f86bb078000 nid=0x4b03 runnable [0x000000011b101000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.PlainSocketImpl.socketAccept(Native Method) 
     at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) 
     at java.net.ServerSocket.implAccept(ServerSocket.java:545) 
     at java.net.ServerSocket.accept(ServerSocket.java:513) 
     at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:79) 
     at java.lang.Thread.run(Thread.java:745) 

"Service Thread" #8 daemon prio=9 os_prio=31 tid=0x00007f86bc810800 nid=0x4703 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007f86bc805800 nid=0x4503 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007f86bb866000 nid=0x4303 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007f86bb833800 nid=0x4103 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007f86bb844000 nid=0x3e23 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f86bc001000 nid=0x2b03 in Object.wait() [0x0000000118c24000] 
    java.lang.Thread.State: WAITING (on object monitor) 
     at java.lang.Object.wait(Native Method) 
     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) 
     - locked (a java.lang.ref.ReferenceQueue$Lock) 
     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) 
     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) 

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f86bb81f000 nid=0x2903 in Object.wait() [0x0000000118b21000] 
    java.lang.Thread.State: WAITING (on object monitor) 
     at java.lang.Object.wait(Native Method) 
     at java.lang.Object.wait(Object.java:502) 
     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) 
     - locked (a java.lang.ref.Reference$Lock) 

"VM Thread" os_prio=31 tid=0x00007f86bb014800 nid=0x2703 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007f86bc005000 nid=0x1f03 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007f86bc005800 nid=0x2103 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007f86bc006800 nid=0x2303 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007f86bc007000 nid=0x2503 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007f86bb80c000 nid=0x4903 waiting on condition 

JNI global references: 250 
+0

听起来像是一个僵局。在悬挂期间,你会从'jstack'得到什么有趣的东西吗? – Rich

+0

(您可以使用Akka HTTP代替套接字,它是非阻塞的。) – Rich

+0

感谢您的建议。我主要选择了基于套接字的示例,从问题中尽可能地去除“魔术”。我们的实际解决方案将使用akka http作为服务器。这只是简单的基础。 :) –

你需要明确消耗实体(身体)。因为响应的实体实际上是一个流,所以如果你不使用连接,它会保持连接打开。 documentation详细说明了请求响应周期。服务器必须在标题中发送Connection: close,或者必须附加一些Sink(例如Sink.ignore)才能使用该流。

实际上,有几种方法可以处理HttpResponse。一种是调用HttpResponsetoStrict(timeout: FiniteDuration)方法,该方法将获得整个实体并关闭连接。 timeout限制了HTTP请求等待发件人响应的时间。如果您对实体不感兴趣,您也可以拨打discardEntityBytes()方法HttpResponse。最后,您可以使用某些有效接收器(例如Unmarshal(resp.entity).to[SomeClass])来使用该流。