谨慎使用Spring 配置标签之任务配置标签
1.导火线
最近都会规律的收到阿里云预警的短信报警,于是,对线上的服务进行了健康度排查,至于排查的方法或者过程不是这里讨论的重点,需要知道的小伙子们,可以看看沈剑和李彦鹏大神的一些干货。这里就不多说了,经过一些列的排查得到的结论是,我们的服务器在一些业务高发的时间段不断的创建/销毁线程,导致内存和CUP的使用率非常的嗨!
2.定位问题
经过上面得到的信息进行分析,发现既然是一个小问题影响的,真是悔不当初呀!这个就是不好好看看操作文档的血淋淋的教训,哪怕是看一眼,所以写出来自勉和跟大家共勉!如题,就是<task:annotation-driver/>配置的问题!
3.问题分析
相信大家在项目中应该有用到Spring 的异步注解@Async,这个是我们的线上的配置:
估计有些小伙伴跟我一样也是一脸懵逼,这有什么问题,spring 很多功能的开启基本都是这些套路了,磨叽,请往下看!
① 先来看一下这个标签的 sxd
注意到红框中的东东了吗!这个是它的默认执行器,咋一说到这里,可能有很多小伙伴还是不太明白,这里我先放一下,先回过头对是spring 异步调用的东东进行一个初级入门先。
4. spring 异步套路分析
我们都是知道spring对标签的初始化的套路都是读取uri的,然后进行解析然后通过反射找到相关的处理从而进行初始化,靠,好绕呀,简单说,就是每一个标签都是用过同名的NamespaceHandler进行处理,例如,上面的<task:annotation-driver/>家族的标签,就是用TaskNamespaceHandler
package org.springframework.scheduling.config; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; /** * {@code NamespaceHandler} for the 'task' namespace. * * @author Mark Fisher * @since 3.0 */ public class TaskNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser()); this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser()); this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser()); } }
这里就可以看到了,对应的标签的初始化处理器了,其他的标签过程大同小异,怎么感觉讲的有点偏向bean的初始化了,回来了!异步任务处理的主要逻辑spring这是通过AnnotationAsyncExecutionInterceptor进行拦截处理,这里只是列出了一些相关的方法,其他自己去看。
/** * Specialization of {@link AsyncExecutionInterceptor} that delegates method execution to * an {@code Executor} based on the {@link Async} annotation. Specifically designed to * support use of {@link Async#value()} executor qualification mechanism introduced in * Spring 3.1.2. Supports detecting qualifier metadata via {@code @Async} at the method or * declaring class level. See {@link #getExecutorQualifier(Method)} for details. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1.2 * @see org.springframework.scheduling.annotation.Async * @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor */ public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor { /** * Return the qualifier or bean name of the executor to be used when executing the * given method, specified via {@link Async#value} at the method or declaring * class level. If {@code @Async} is specified at both the method and class level, the * method's {@code #value} takes precedence (even if empty string, indicating that * the default executor should be used preferentially). * @param method the method to inspect for executor qualifier metadata * @return the qualifier if specified, otherwise empty string indicating that the * {@linkplain #setExecutor(Executor) default executor} should be used * @see #determineAsyncExecutor(Method) */ @Override protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); } }
可以看到这里面其实并没有看到什么,主要的逻辑在父类中
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered { /** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); } /** * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor} * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all), * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance * for local use if no default could be found. * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ @Override protected Executor getDefaultExecutor(BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); } }
这个方法还是在父类中,哎,好深的套路
/** * Base class for asynchronous method execution aspects, such as * {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor} * or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}. * * <p>Provides support for <i>executor qualification</i> on a method-by-method basis. * {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code * Executor}, but each individual method may further qualify a specific {@code Executor} * bean to be used when executing it, e.g. through an annotation attribute. * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @since 3.1.2 */ public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { /** * The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor". * <p>Note that the initial lookup happens by type; this is just the fallback * in case of multiple executor beans found in the context. * @since 4.2.6 */ public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor"; // Java 8's CompletableFuture type present? private static final boolean completableFuturePresent = ClassUtils.isPresent( "java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader()); private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<Method, AsyncTaskExecutor>(16); private volatile Executor defaultExecutor; private AsyncUncaughtExceptionHandler exceptionHandler; private BeanFactory beanFactory; /** * Determine the specific executor to use when executing the given method. * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. * @return the executor to use (or {@code null}, but just if no default executor is available) */ protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor; if (targetExecutor == null) { synchronized (this.executors) { if (this.defaultExecutor == null) { // 这里获取默认的执行器是重点 this.defaultExecutor = getDefaultExecutor(this.beanFactory); } targetExecutor = this.defaultExecutor; } } } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; } /** * Retrieve or build a default executor for this advice instance. * An executor returned from here will be cached for further use. * <p>The default implementation searches for a unique {@link TaskExecutor} bean * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable, this implementation will return {@code null}. * @param beanFactory the BeanFactory to use for a default executor lookup * @return the default executor, or {@code null} if none available * @since 4.2.6 * @see #findQualifiedExecutor(BeanFactory, String) * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ protected Executor getDefaultExecutor(BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; } }
这里有点绕,其实在第二层的父类中已经重写了getDefaultExecutor了,相关的代码,上面已经贴出来了,可以看到的是,默认什么都没有配合的时候,它的默认执行器是SimpleAsyncTaskExecutor,其实在第三小节的我贴出来的sxd限制文档中也有提到的,那到底这个鬼到底干了什么呢?上代码!
/** * Template method for the actual execution of a task. * <p>The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */ protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
这个是的主要执行代码,看到的是,当threadFactory 为空的时候,他回去调用createThread这个创建线程的方法去创建一个新的线程去执行目标代码的
/** * Template method for the creation of a new {@link Thread}. * <p>The default implementation creates a new Thread for the given * {@link Runnable}, applying an appropriate thread name. * @param runnable the Runnable to execute * @see #nextThreadName() */ public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; }
可以看到的,当我们业务很是频繁的时候,这里就会不断的创建线程,我的天呀,服务能挺到现在也不容易!异步执行的套路分析到这里!
5. 总结
- 其实任务执行框架是有给我们提供相应的线程池的执行器和配置方法,就是没有看呀,这个是一知半解的危害呀!
- 解决方法也很简单,直接通过文档配置ThreadPoolTaskExecutor 这个自带的线程池就可以满足大部分的业务场景了,但是我们这边用到的线程变量进行登陆人信息,要求转到异步的时候也要能拿到这些线程变量,所以用了其他方式,迟点在跟大家分享了!
水平有限,GGYY乱说一通,有疑问大家可以探讨一下,莫喷!!!!