关于Spring中的@Async注解以及为什么不建议使用 - Java技术债务


简介

Async 注解是 Java 8 中的一个注解,用于标识一个方法是异步执行的。当一个方法被标记为 Async 时,该方法将在一个新的线程中执行,并且可以立即返回一个 CompletableFuture 对象。使用 CompletableFuture 可以更轻松地管理异步计算的结果。下面是一个使用 Async 注解的示例代码:

@Async
public CompletableFuture doSomethingAsync() {    
	// 异步执行一些操作
}

在上面的代码中,doSomethingAsync() 方法被标记为 Async,这意味着该方法将在一个新的线程中异步执行,同时返回一个 CompletableFuture 对象。

应用场景

  • 同步: 同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。
  • 异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。

例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕;如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。

在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的业务子线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。

Spring 已经实现的线程池

  1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
  2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。
  3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
  4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
  5. ThreadPoolTaskExecutor :最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装。

异步的方法

  1. 最简单的异步调用,返回值为void
  2. 带参数的异步调用,异步方法可以传入参数
  3. 存在返回值,常调用返回Future

Spring中启用@Async

配置类

@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    /**
     * 核心线程
     */
    private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() == 1 ? 4 : Runtime.getRuntime().availableProcessors();

    @Primary
    @Bean("commonExecutor")
    public Executor commonExecutor() {
        log.info("==== start initialize common async executor =====");
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(CORE_POOL_SIZE);
        taskExecutor.setQueueCapacity(1024);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("commonExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(20);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("xmarsRecommendationExecutor")
    public Executor xmarsRecommendationExecutor() {
        log.info("==== start initialize xmars recommendation async executor =====");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(CORE_POOL_SIZE);
        executor.setQueueCapacity(10240);
        executor.setKeepAliveSeconds(120);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("xmars-recommendation-executor--");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return commonExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return ((ex, method, params) -> log.error("执行异步任务:{}", method, ex));
    }
}

启用方式

// Spring boot启用:
@EnableAsync
@EnableTransactionManagement
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

@Async默认线程池

Spring应用默认的线程池,指在@Async注解在使用时,不指定线程池的名称,@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。

使用方式

无返回值调用

基于@Async无返回值调用,直接在使用类,使用方法(建议在使用方法)上,加上注解。若需要抛出异常,需手动new一个异常抛出。

/**
 * 带参数的异步调用 异步方法可以传入参数    
 *  对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉    

 * @param s
 */
@Async
public void asyncInvokeWithException(String s) {
    log.info("asyncInvokeWithParameter, parementer={}", s);
    throw new IllegalArgumentException(s);
}

有返回值Future调用

/**
 * 异常调用返回Future
 * 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
 * 或者在调用方在调用Futrue.get时捕获异常进行处理
 *
 * @param i
 * @return
 */
@Async
public Future asyncInvokeReturnFuture(int i) {
    log.info("asyncInvokeReturnFuture, parementer={}", i);
    Future future;
    try {
        Thread.sleep(1000 * 1);
        future = new AsyncResult("success:" + i);
        throw new IllegalArgumentException("a");
    } catch (InterruptedException e) {
        future = new AsyncResult("error");
    } catch (IllegalArgumentException e) {
        future = new AsyncResult("error-IllegalArgumentException");
    }
    return future;
}

以上两种使用也可以使用CompletableFuture 代替,具体详情请看:

CompletableFuture

默认线程池的弊端

  • 在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。* Executors各个方法的弊端:
  • newFixedThreadPoolnewSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  • newCachedThreadPoolnewScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

@Async应用自定义线程池

自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。自定义线程池有如下模式:

  • 重新实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

通过查看Spring源码关于@Async的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。且重新实现 public Executor getAsyncExecutor()方法。

实现接口AsyncConfigurer

详情请看上边配置类https://www.notion.so/cuizb/Async-46059826a5fb4e1a911753d911703fdf?pvs=4#dc5bb44d51a0448b861738a337ea7f0e

继承类AsyncConfigurerSupport

类AsyncConfigurerSupport实现接口AsyncConfigurer

配置自定义的TaskExecutor

由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。

所以可以在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class便可。

比如:

Executor.class:ThreadPoolExecutorAdapter->ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor

这样的模式,最终底层为Executor.class,在替换默认的线程池时,需设置默认的线程池名称为TaskExecutor

TaskExecutor.class:ThreadPoolTaskExecutor->SchedulingTaskExecutor->AsyncTaskExecutor->TaskExecutor

这样的模式,最终底层为TaskExecutor.class,在替换默认的线程池时,可不指定线程池名称。最新面试题整理好了,点击Java面试库 小程序在线刷题。

@EnableAsync
@Configuration
public class TaskPoolConfig {
   @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
   public Executor taskExecutor() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
       executor.setCorePoolSize(10);
       //最大线程数
       executor.setMaxPoolSize(20);
       //队列容量
       executor.setQueueCapacity(200);
       //活跃时间
       executor.setKeepAliveSeconds(60);
       //线程名字前缀
       executor.setThreadNamePrefix("taskExecutor-");
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       return executor;
   }
  @Bean(name = "new_task")
   public Executor taskExecutor() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
       executor.setCorePoolSize(10);
       //最大线程数
       executor.setMaxPoolSize(20);
       //队列容量
       executor.setQueueCapacity(200);
       //活跃时间
       executor.setKeepAliveSeconds(60);
       //线程名字前缀
       executor.setThreadNamePrefix("taskExecutor-");
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       return executor;
   }
}

@Async注解,使用系统默认或者自定义的线程池(代替默认线程池)。可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("new_task")

源码解析

AsyncExecutionAspectSupport.java

// 定义了一个受保护的方法,该方法接受一个Method类型的参数,并返回一个AsyncTaskExecutor类型的对象。  
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {  
    // 尝试从executors这个Map中获取与给定方法关联的AsyncTaskExecutor。  
    AsyncTaskExecutor executor = this.executors.get(method);  
  
    // 如果executor为空,表示当前方法还没有对应的AsyncTaskExecutor。  
    if (executor == null) {  
        // 根据方法确定执行器的标识符。  
        String qualifier = getExecutorQualifier(method);  
  
        // 如果获取到了非空的执行器标识符。  
        if (StringUtils.hasLength(qualifier)) {  
            // 在beanFactory中查找具有指定标识符的执行器。  
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);  
        }  
        // 如果没有获取到执行器标识符,则使用默认的执行器。  
        else {  
            targetExecutor = this.defaultExecutor.get();  
        }  
  
        // 如果仍然没有找到执行器,则返回null。  
        if (targetExecutor == null) {  
            return null;  
        }  
  
        // 如果targetExecutor是AsyncListenableTaskExecutor的实例,则直接赋值给executor。  
        // 否则,使用targetExecutor创建一个新的TaskExecutorAdapter并将其赋值给executor。  
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?  
                (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));  
  
        // 将方法和执行器的关联存储到executors这个Map中。  
        this.executors.put(method, executor);  
    }  
  
    // 返回与给定方法关联的AsyncTaskExecutor。  
    return executor;  
}

简单总结:

  1. 这个方法首先尝试从executors的Map中直接获取与给定方法关联的AsyncTaskExecutor
  2. 如果Map中没有,它会根据方法的某些特性(通过getExecutorQualifier方法)来查找或确定一个执行器。
  3. 如果找到了执行器,它会检查这个执行器是否是AsyncListenableTaskExecutor的实例。如果是,则直接使用;如果不是,则将其包装在TaskExecutorAdapter中。
  4. 最后,将方法与确定的执行器关联存储在executors的Map中,并返回该执行器。

AsyncConfigurer.java

public interface AsyncConfigurer {

	/**
	 * The {@link Executor} instance to be used when processing async
	 * method invocations.
	 */
	@Nullable
	default Executor getAsyncExecutor() {
		return null;
	}

	/**
	 * The {@link AsyncUncaughtExceptionHandler} instance to be used
	 * when an exception is thrown during an asynchronous method execution
	 * with {@code void} return type.
	 */
	@Nullable
	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return null;
	}
}

Untitled

AsyncExecutionAspectSupport.java

// 定义了一个受保护的方法,该方法接受一个可空的BeanFactory作为参数,并返回一个Executor类型的对象。  
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {  
    // 如果beanFactory不为空,则继续执行以下逻辑。  
    if (beanFactory != null) {  
        try {  
            // 尝试从beanFactory中获取一个TaskExecutor类型的bean。  
            // 这里特别指定了TaskExecutor而不是普通的Executor,因为后者可能会匹配到ScheduledExecutorService,  
            // 这对于我们的目的来说是不可用的。TaskExecutor更明确地为此设计。  
            return beanFactory.getBean(TaskExecutor.class);  
        }  
        catch (NoUniqueBeanDefinitionException ex) {  
            // 如果找到了多个TaskExecutor bean,记录一条debug级别的日志。  
            logger.debug("Could not find unique TaskExecutor bean", ex);  
            try {  
                // 尝试获取名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor类型的bean。  
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);  
            }  
            catch (NoSuchBeanDefinitionException ex2) {  
                // 如果连默认的TaskExecutor bean都没有找到,并且logger的info级别是启用的,  
                // 则记录一条info级别的日志,说明找到了多个TaskExecutor bean,但没有一个是名为'taskExecutor'的,  
                // 并列出所有找到的bean的名称。  
                if (logger.isInfoEnabled()) {  
                    logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. " +  
                            "Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());  
                }  
            }  
        }  
        catch (NoSuchBeanDefinitionException ex) {  
            // 如果连一个TaskExecutor bean都没有找到,记录一条debug级别的日志。  
            logger.debug("Could not find default TaskExecutor bean", ex);  
            try {  
                // 再次尝试获取名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor类型的bean。  
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);  
            }  
            catch (NoSuchBeanDefinitionException ex2) {  
                // 如果连名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的bean都没有找到,记录一条info级别的日志,  
                // 说明没有找到用于异步处理的TaskExecutor bean。  
                logger.info("No task executor bean found for async processing: " +  
                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");  
            }  
            // 如果没有找到任何可用的bean,方法返回null。  
        }  
    }  
    // 如果beanFactory本身就是null,方法也返回null。  
    return null;  
}

https://mmbiz.qpic.cn/mmbiz_png/mR4CwoLXicg3LaOuyIHahh7jtrUvagx2fV3FSO6caGkylwaMw37PHiayowQkDjxV82c2Duic9d58eiclekW5WkKB1g/640?wx_fmt=png

这个方法通过一系列的异常捕获来尝试获取一个可用的 TaskExecutor bean。如果beanFactory中没有找到任何 TaskExecutor bean,或者没有找到名为 DEFAULT_TASK_EXECUTOR_BEAN_NAME 的bean,方法最终会返回null。这样的设计允许开发者在Spring容器中配置一个或多个 TaskExecutor bean,并通过名称或标记其中一个为primary来指定哪个bean应该被用作默认的异步任务执行器。如果没有找到任何bean,则框架可能会回退到使用本地的默认执行器,或者完全不使用执行器。

如果没有找到项目中设置的默认线程池时,采用spring 默认的线程池

/**
 * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
 * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
 * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
 * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
 * for local use if no default could be found.
 * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

为什么不建议直接使用 @Async 注解?

在泰山版《阿里巴巴开发手册》规定开发中不建议使用 Async 注解,这是为什么?在实际开发中,异步编程已经成为了一个必备的技能。为了帮助开发者更轻松地进行异步编程,Java 8 引入了 Async 注解,使得异步编程变得更加简单。然而,虽然 Async 注解可以为我们带来很多好处,但是在某些情况下直接使用 Async 注解并不是一个好主意。

Untitled

Async 注解的优点

使用 Async 注解有以下优点:

  • 简化异步编程:使用 Async 注解可以使得异步编程变得更加简单。开发者只需要将需要异步执行的方法标记为 Async,然后使用 CompletableFuture 来管理异步计算的结果即可。
  • 提高应用程序的响应速度:使用 Async 注解可以将耗时的操作异步执行,从而避免阻塞主线程,提高应用程序的响应速度。
  • 提高系统的并发性能:使用 Async 注解可以使得多个异步操作并发执行,从而提高系统的并发性能。

Async 注解的局限性

然而,虽然 Async 注解可以为我们带来很多好处,但是它也有一些局限性。

  • 异常处理:使用 Async 注解时,异常处理可能会变得更加复杂。由于异步操作是在另一个线程中执行的,因此如果异步操作抛出了异常,这个异常可能不会被捕获。为了解决这个问题,开发者需要使用 CompletableFuture 的异常处理机制来捕获异步操作抛出的异常。
  • 内存占用:使用 Async 注解时,由于每个异步操作都会在一个新的线程中执行,因此可能会导致大量的线程被创建。这可能会导致内存占用过高,从而导致应用程序性能下降。
  • 阻塞操作:使用 Async 注解时,如果异步操作中包含了阻塞操作,这可能会导致线程池中的线程被阻塞,从而导致应用程序性能下降。

不建议直接使用 Async 注解的原因

由于 Async 注解的局限性,直接使用 Async 注解可能不是一个好主意。下面是不建议直接使用 Async 注解的原因:

  • 可能会导致性能问题:由于 Async 注解会创建新的线程来执行异步操作,因此如果使用不当,可能会导致线程池中的线程被过度消耗,从而导致性能问题。
  • 可能会导致内存泄漏问题:如果使用 Async 注解时没有正确地管理线程池,可能会导致内存泄漏问题。例如,如果不正确地配置线程池大小,可能会导致线程池中的线程无法回收,从而导致内存泄漏。
  • 可能会导致死锁问题:如果异步操作中包含了阻塞操作,可能会导致线程池中的线程被阻塞,从而导致死锁问题。

综上所述,直接使用 Async 注解可能会导致各种问题,因此不建议直接使用 Async 注解。

如何更好地使用 Async 注解

虽然不建议直接使用 Async 注解,但是在某些情况下,使用 Async 注解仍然是一个不错的选择。下面是一些使用 Async 注解的最佳实践:

  • 配置线程池:使用 Async 注解时,应该配置合适的线程池大小。线程池的大小应该根据应用程序的性质和需求来确定。
  • 使用异常处理机制:使用 Async 注解时,应该使用 CompletableFuture 的异常处理机制来捕获异步操作抛出的异常。
  • 避免阻塞操作:使用 Async 注解时,应该避免在异步操作中包含阻塞操作。如果必须使用阻塞操作,应该使用 CompletableFuture 的 supplyAsync() 方法来确保阻塞操作在一个新的线程中执行。
  • 使用 CompletableFuture API:使用 Async 注解时,应该使用 CompletableFuture API 来管理异步计算的结果。使用 CompletableFuture API 可以更加轻松地处理异步操作的结果,并避免一些潜在的问题。

综上所述,使用 Async 注解可以为我们带来很多好处,但是在使用 Async 注解时需要注意一些问题,以避免出现性能问题、内存泄漏问题和死锁问题等。因此,在使用 Async 注解时,我们应该遵循一些最佳实践来确保代码的正确性和性能。

   登录后才可以发表呦...

专注分享Java技术干货,包括
但不仅限于多线程、JVM、Spring Boot
Spring Cloud、 Redis、微服务、
消息队列、Git、面试题 最新动态等。

想交个朋友吗
那就快扫下面吧


微信

Java技术债务

你还可以关注我的公众号

会分享一些干货或者好文章

Java技术债务