如何立即发送请求异步响应
问题描述:
我试图找到一种异步方式立即返回客户端请求响应。如何立即发送请求异步响应
我只需要记录请求数据,调用新线程来请求其他服务器上的昂贵操作(一些后端操作) 并且无需等待来自它们的响应立即返回200状态响应到客户端。
在这一刻,我试图用CompletableFuture做到这一点,但我错过了一些东西。
package com.example.controller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletableFuture;
@Path("/class")
public class AsynchronousResponse {
private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName());
private static final int HTTP_STATUS_OK = 200;
private static final String EMPTY_CONTENT = "";
@Context
private HttpServletRequest httpRequest;
@Path("/method")
@GET
@Consumes(MediaType.APPLICATION_JSON)
public Response asyncResponseSupply() {
LOGGER.info(httpRequest.getSession().getId() + " : New session received");
CompletableFuture.supplyAsync(this::veryExpensiveOperations);
LOGGER.info(httpRequest.getSession().getId() + " : Returning empty response... ");
return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build();
}
// need to do some operations on data from httpRequest
private String veryExpensiveOperations() {
LOGGER.info(httpRequest.toString());
LOGGER.info("Start very expensive operations");
try {
Thread.sleep(3000);
LOGGER.info("Finished");
return "DONE";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
LOGGER.error("Error: " + e.getMessage());
return "ERROR";
}
}
}
所有的一切我得到立即回应,但veryExpensiveOperations()方法似乎失去了HttpRequest的值,它是太糟糕了,我becouse我需要调用其他Web服务从客户端请求的值。感谢帮助!
对于我的应用程序,我使用Jetty版本。 9.2.18.v20160721
答
一旦完成向servlet的分派,响应将被提交,并且请求和响应对象将被回收。
对于servlet规范的处理,您必须使用AsyncContext
(通过调用HttpServletRequest.startAsync()
获取)让容器知道请求/响应尚未完成,并且处理它是在非调度线程中发生(如你的CompletableFuture.supplyAsync()
电话)
至于如何混合JAX-RS和AsyncContext
,我不知道。我甚至不知道JAX-RS是否已经更新以支持AsyncContext
。
答
感谢Joakim你的回答(我无法赞成它,因为我在这里很新,而且我没有足够的声望)。根据你对于AsyncContext
的理解,我发现了一种发送快速响应的方法。我的代码更改后(对于runAsync和supplyAsync均正常工作):
package com.example.controller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
@Path("/test")
public class AsynchronousResponse {
private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName());
private static final int HTTP_STATUS_OK = 200;
private static final String EMPTY_CONTENT = "";
@Context
private HttpServletRequest httpRequest;
private AsyncContext asyncContext;
@Path("/supply")
@GET
@Consumes(MediaType.APPLICATION_JSON)
public Response asyncResponseSupply() {
String loggerPrefix = httpRequest.getSession().getId() + " :[SUPPLY] ";
LOGGER.info(loggerPrefix + "New session received");
LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount());
asyncContext = httpRequest.startAsync();
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
CompletableFuture.supplyAsync(this::veryExpensiveOperations, pool);
LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount());
LOGGER.info(loggerPrefix + "Returning empty response... ");
return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build();
}
@Path("/run")
@GET
@Consumes(MediaType.APPLICATION_JSON)
public Response asyncResponseSupplyRun() {
String loggerPrefix = httpRequest.getSession().getId() + " :[RUN] ";
LOGGER.info(loggerPrefix + "New session received");
LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount());
asyncContext = httpRequest.startAsync();
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
CompletableFuture.runAsync(this::veryExpensiveOperations, pool);
LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount());
LOGGER.info(loggerPrefix + "Returning empty response... ");
return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build();
}
// need to do some operations on data from httpRequest
private String veryExpensiveOperations() {
String loggerPrefix = httpRequest.getSession().getId() + " :[veryExpensiveOperations] ";
LOGGER.info(loggerPrefix + "Request toString: " + httpRequest.toString());
LOGGER.info(loggerPrefix + "Start very expensive operations");
try {
Thread.sleep(3000);
LOGGER.info(loggerPrefix + "Thread sleep finished");
LOGGER.info(loggerPrefix + "Actual active Threads: " + Thread.activeCount());
Thread.currentThread().interrupt();
asyncContext.complete();
return "DONE";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
LOGGER.error(loggerPrefix + "Error: " + e.getMessage());
asyncContext.complete();
return "ERROR";
}
}
}