如何立即发送请求异步响应

问题描述:

我试图找到一种异步方式立即返回客户端请求响应。如何立即发送请求异步响应

我只需要记录请求数据,调用新线程来请求其他服务器上的昂贵操作(一些后端操作) 并且无需等待来自它们的响应立即返回200状态响应到客户端。

在这一刻,我试图用CompletableFuture做到这一点,但我错过了一些东西。

package com.example.controller; 

import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.core.Logger; 

import javax.servlet.http.HttpServletRequest; 
import javax.ws.rs.Consumes; 
import javax.ws.rs.GET; 
import javax.ws.rs.Path; 
import javax.ws.rs.core.Context; 
import javax.ws.rs.core.MediaType; 
import javax.ws.rs.core.Response; 
import java.util.concurrent.CompletableFuture; 

@Path("/class") 
public class AsynchronousResponse { 

private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName()); 
private static final int HTTP_STATUS_OK = 200; 
private static final String EMPTY_CONTENT = ""; 

@Context 
private HttpServletRequest httpRequest; 

@Path("/method") 
@GET 
@Consumes(MediaType.APPLICATION_JSON) 
public Response asyncResponseSupply() { 
    LOGGER.info(httpRequest.getSession().getId() + " : New session received"); 
    CompletableFuture.supplyAsync(this::veryExpensiveOperations); 
    LOGGER.info(httpRequest.getSession().getId() + " : Returning empty response... "); 
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build(); 
} 

// need to do some operations on data from httpRequest 
private String veryExpensiveOperations() { 
    LOGGER.info(httpRequest.toString()); 
    LOGGER.info("Start very expensive operations"); 
    try { 
     Thread.sleep(3000); 
     LOGGER.info("Finished"); 
     return "DONE"; 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
     e.printStackTrace(); 
     LOGGER.error("Error: " + e.getMessage()); 
     return "ERROR"; 
    } 
} 

} 

所有的一切我得到立即回应,但veryExpensiveOperations()方法似乎失去了HttpRequest的值,它是太糟糕了,我becouse我需要调用其他Web服务从客户端请求的值。感谢帮助!

对于我的应用程序,我使用Jetty版本。 9.2.18.v20160721

一旦完成向servlet的分派,响应将被提交,并且请求和响应对象将被回收。

对于servlet规范的处理,您必须使用AsyncContext(通过调用HttpServletRequest.startAsync()获取)让容器知道请求/响应尚未完成,并且处理它是在非调度线程中发生(如你的CompletableFuture.supplyAsync()电话)

至于如何混合JAX-RS和AsyncContext,我不知道。我甚至不知道JAX-RS是否已经更新以支持AsyncContext

感谢Joakim你的回答(我无法赞成它,因为我在这里很新,而且我没有足够的声望)。根据你对于AsyncContext的理解,我发现了一种发送快速响应的方法。我的代码更改后(对于runAsync和supplyAsync均正常工作):

package com.example.controller; 

import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.core.Logger; 

import javax.servlet.AsyncContext; 
import javax.servlet.http.HttpServletRequest; 
import javax.ws.rs.Consumes; 
import javax.ws.rs.GET; 
import javax.ws.rs.Path; 
import javax.ws.rs.core.Context; 
import javax.ws.rs.core.MediaType; 
import javax.ws.rs.core.Response; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.ForkJoinPool; 


@Path("/test") 
public class AsynchronousResponse { 

private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName()); 
private static final int HTTP_STATUS_OK = 200; 
private static final String EMPTY_CONTENT = ""; 

@Context 
private HttpServletRequest httpRequest; 

private AsyncContext asyncContext; 

@Path("/supply") 
@GET 
@Consumes(MediaType.APPLICATION_JSON) 
public Response asyncResponseSupply() { 
    String loggerPrefix = httpRequest.getSession().getId() + " :[SUPPLY] "; 
    LOGGER.info(loggerPrefix + "New session received"); 
    LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount()); 
    asyncContext = httpRequest.startAsync(); 
    ForkJoinPool pool = new ForkJoinPool(
      Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, 
      true); 
    CompletableFuture.supplyAsync(this::veryExpensiveOperations, pool); 
    LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount()); 
    LOGGER.info(loggerPrefix + "Returning empty response... "); 
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build(); 
} 

@Path("/run") 
@GET 
@Consumes(MediaType.APPLICATION_JSON) 
public Response asyncResponseSupplyRun() { 
    String loggerPrefix = httpRequest.getSession().getId() + " :[RUN] "; 
    LOGGER.info(loggerPrefix + "New session received"); 
    LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount()); 
    asyncContext = httpRequest.startAsync(); 
    ForkJoinPool pool = new ForkJoinPool(
      Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, 
      true); 
    CompletableFuture.runAsync(this::veryExpensiveOperations, pool); 
    LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount()); 
    LOGGER.info(loggerPrefix + "Returning empty response... "); 
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build(); 
} 

// need to do some operations on data from httpRequest 
private String veryExpensiveOperations() { 
    String loggerPrefix = httpRequest.getSession().getId() + " :[veryExpensiveOperations] "; 
    LOGGER.info(loggerPrefix + "Request toString: " + httpRequest.toString()); 
    LOGGER.info(loggerPrefix + "Start very expensive operations"); 

    try { 
     Thread.sleep(3000); 
     LOGGER.info(loggerPrefix + "Thread sleep finished"); 
     LOGGER.info(loggerPrefix + "Actual active Threads: " + Thread.activeCount()); 
     Thread.currentThread().interrupt(); 
     asyncContext.complete(); 
     return "DONE"; 

    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
     e.printStackTrace(); 
     LOGGER.error(loggerPrefix + "Error: " + e.getMessage()); 
     asyncContext.complete(); 
     return "ERROR"; 

    } 
} 

}