项目中- 多线程异步 Future 使用

一. 项目中架构要求:

        外围客户端系统 -  交易中心微服务(场景微服务) - 交易集市微服务/能力中心 - ESB - 后方系统(理财/基金等)。

交易中心需要同时调用交易集市十几个组件/接口。由于通讯时间太长和接口请求太多,考虑使用多线程。

         考虑使用非阻塞的多线程类 Future。Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。

二. 项目使用是银行内部代码,不便于展示,当时的案例demo如下 ,亲测。

 

2.1 配置类。

 

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
class ThreadPoolConfig {
    // @Bean 指定类标识,默认为 类的首字母小写
    @Bean("taskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(5);
        // 设置最大线程数
        executor.setMaxPoolSize(10);
        // 设置队列容量
        executor.setQueueCapacity(20);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("mynah886-test-async-threads-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

 

 

2.2 模拟出入参数类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskInfo {
    private String taskId;
    private String taskName;
}

 

2.3  模拟实现逻辑

2.3.1 控制层类

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
@RestController
@RequestMapping("/async")
public class AsyncController {
    private Logger logger = LoggerFactory.getLogger(AsyncController.class);
    @Autowired
    private TaskLogService taskLogService;

    @GetMapping(value = "/task" )
    public String taskExecute(){
        TaskInfo taskInfo1 = new TaskInfo("task1", "taskName001");
        TaskInfo taskInfo2 = new TaskInfo("task2", "taskName001");
        long startTime = System.currentTimeMillis();
        Future<TaskInfo> future1 = null;
        Future<TaskInfo> future2 = null;
        try {
            future1 = taskLogService.insertTaskLog(taskInfo1);
            future2 = taskLogService.updateTaskLog(taskInfo2);
            // 异步线程池执行  等待执行完成  isDone() 进行判断
            /*while (true) {
                if (future1.isDone() && future2.isDone()) {
                    System.out.println("异步任务一、二已完成");
                    break;
                }
            }*/
        } catch (Exception e) {
            log.debug("执行异步任务异常 {}" + e.getMessage());
        }
        /* V get(long timeout, TimeUnit unit)  设置取结果超时时间 */
        TaskInfo result1 = null;
        TaskInfo result2 = null;
        try {
            result1 = future1.get(3, TimeUnit.SECONDS);
            result2 = future2.get();
            log.debug("任务一result1 == " + result1  + "   任务二result2 == " + result2);
        } catch (InterruptedException e) {
            log.debug("InterruptedException: {}", e.getMessage());
        } catch (ExecutionException e) {
            log.debug("ExecutionException: {}", e.getMessage());
        } catch (TimeoutException e) {
            log.debug("接口get取值超时: {}", e.getMessage());
        }
        long endTime = System.currentTimeMillis();
        log.debug("异步任务总耗时: " + (endTime - startTime));
        return result1 + " --- " + result2;
    }
}

2.3.2 接口类

import java.util.concurrent.Future;
public interface TaskLogService {
    Future<TaskInfo> insertTaskLog(TaskInfo taskInfo) throws InterruptedException;
    Future<TaskInfo> updateTaskLog(TaskInfo taskInfo) throws InterruptedException;
}

2.3.3 实现类

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
@Service
public class TaskLogServiceImpl implements TaskLogService {

    /**
     * 需要进行多线程任务的方法
     *    1. 方法注解  @Async("taskExecutor") ,括号指定 线程池 名称
     *    2. 方法返回值类型为 AsyncResult<T>
     * 注意: 方法返回值类型由 Future 进行包装,即 Future<T>, 返回对象为 AsyncResult<T>
     * @param taskInfo
     * @return Future<String>
     * @throws InterruptedException
     */
    @Override
    @Async("taskExecutor")
    public Future<TaskInfo> insertTaskLog(TaskInfo taskInfo) throws InterruptedException {
        System.out.println("1---------currentThread: " + Thread.currentThread() );
        System.out.println("任务一  Thread Sleep 2s Start, " + taskInfo.getTaskId());
        Thread.sleep(2000);
        taskInfo.setTaskId("001-testId");
        taskInfo.setTaskName("001-teatName");
        System.out.println("任务一  Thread Sleep 2s End, " + taskInfo.getTaskId());
        return new AsyncResult<>( taskInfo );
    }
    @Override
    @Async("taskExecutor")
    public Future<TaskInfo> updateTaskLog(TaskInfo taskInfo) throws InterruptedException {
        System.out.println("2---------currentThread: " + Thread.currentThread() );
        System.out.println("任务二  Thread Sleep 5s Start, " + taskInfo.getTaskId());
        Thread.sleep(5000);
        taskInfo.setTaskId("002-testId");
        taskInfo.setTaskName("002-teatName");
        System.out.println("任务二  Thread Sleep 5s End, " + taskInfo.getTaskId());
        return new AsyncResult<>( taskInfo );
    }
}

2.4  测试结果

项目中- 多线程异步 Future 使用