多个http请求的Akka流

问题描述:

在我的一个项目中,我有一个akka actor向我的谷歌fcm服务器发送请求。参与者需要一个ID列表,并且应该尽可能多地提出请求。我在runForeach(println(_))中打印出服务器的响应,但我只收到一个打印输出的整个ID列表。为什么会发生?多个http请求的Akka流

class FCMActor(val key: String) extends Actor{ 
    import fcm.FCMActor._ 
    import akka.pattern.pipe 
    import context.dispatcher 

    private implicit def system: ActorSystem = ActorSystem() 
    final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system)) 

    def buildBody(id: Option[String]): String = { 
    Json.obj(
     "to" -> id, 
     "priority" -> "high", 
     "data" -> Json.obj("message" -> "Firebase Clud Message"), 
     "time_to_live" -> 60 
    ).toString() 
    } 

    def buildHttpRequest(body: String): HttpRequest = { 
    HttpRequest(method = HttpMethods.POST, 
     uri = s"/fcm/send", 
     entity = HttpEntity(MediaTypes.`application/json`, body), 
     headers = List(RawHeader("Authorization", s"key=$key"))) 
    } 

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = { 
    Http().outgoingConnection("fcm.googleapis.com") 
    } 

    def send(ids: List[Option[String]]) = { 

    val httpRequests: List[HttpRequest] = ids.map(buildBody).map(buildHttpRequest) 
    println(httpRequests) 

    Source(httpRequests).via(connectionFlow).runForeach(println(_)) // << here I only get one println 
    } 

    override def receive: Receive = { 
    case SendToIds(ids: List[Option[String]]) => 
     send(ids) 

    } 
} 
+0

我假设你的东西线只从流中获得一个响应?也许添加实际的日志将有助于澄清这一点。 –

+0

@StefanoBonetti这就是我的意思。我只在流程中得到一个响应'HttpResponse(200 OK,List(日期:星期五,2016年12月30日15:10:43 GMT,到期日:星期五,2016年12月30日15:10:43 GMT,Cache-Control:private ,max-age = 0,X-Content-Type-Options:nosniff,X-Frame-Options:SAMEORIGIN,X-XSS-Protection:1; mode = block,Server:GSE,Accept-Ranges:none,Vary:Accept -Encoding),HttpEntity.Chunked(application/json),HttpProtocol(HTTP/1.1))'我希望这就是你对日志的含义。 – Lukasz

您没有使用服务器向您发送的响应实体。要了解为什么这很重要,请查看相关的docs page

快速代码更改,试图解决这个问题是:

... .runForeach{ response => 
    response.discardEntityBytes() 
    println(response) 
} 

或者,如果你在实体真正感兴趣的,沿着

... .runForeach{ _.entity.dataBytes 
    .runFold(ByteString.empty) { case (acc, b) => acC++ b } 
    .map(println(_)) 
}