摘要: 原创出处 https://muyinchen.github.io/2017/09/28/Spring5%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90 —— Spring%E4%B8%AD%E7%9A%84%E5%BC%82%E6%AD%A5%E4%BA%8B%E4%BB%B6/ 「一叶知秋」欢迎转载,保留摘要,谢谢!


上一篇 Spring框架中的事件和监听器并未对Spring框架中的异步事件涉及太多,所以本篇是对其一个补充。

同步事件有一个主要缺点:它们在所调用线程的本地执行(也就是将所调用线程看成主线程的话,就是在主线程里依次执行)。如果监听器处理同步事件需要5秒钟的响应,则最终结果是用户将在至少5秒内无法看到响应(可以通过Spring框架中的事件和监听器中的例子了解具体)。所以,我们可以通过一个替代方案来解决这个问题 - 异步事件。



在Spring中处理异步事件是基于本地的Java并发解决方案—任务执行器(可以了解下Java Executor框架的内容)。事件由multicastEvent 方法调度。它通过使用java.util.concurrent.Executor接口的实现将事件发送到专用的监听器。Multicaster会调用同步执行器,因为它是默认实现,这点在Spring框架中的事件和监听器有明确的例子,从源码的角度也就是是否设置有SyncTaskExecutor实例。从public void setTaskExecutor(@Nullable Executor taskExecutor)其中,@Nullable 可看出Executor参数可为null,默认不设置的话,multicastEvent也就直接 跳过异步执行了


public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event));

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
else {
invokeListener(listener, event);

private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
return ResolvableType.forInstance(event);

* Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
* to invoke each listener with.
* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
* executing all listeners synchronously in the calling thread.
* <p>Consider specifying an asynchronous task executor here to not block the
* caller until all listeners have been executed. However, note that asynchronous
* execution will not participate in the caller's thread context (class loader,
* transaction association) unless the TaskExecutor explicitly supports this.
* @see org.springframework.core.task.SyncTaskExecutor
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
* @Nullable 可看出Executor参数可为null,默认不设置的话,上面multicastEvent也就直接 * 跳过异步执行了
public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;

* Return the current task executor for this multicaster.
protected Executor getTaskExecutor() {
return this.taskExecutor;


* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* @see #doExecute(Runnable)
public void execute(Runnable task) {
execute(task, TIMEOUT_INDEFINITE);

* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* <p>Executes urgent tasks (with 'immediate' timeout) directly,
* bypassing the concurrency throttle (if active). All other
* tasks are subject to throttling.
* @see #doExecute(Runnable)
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
else {

public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;

public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask<>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));



* Return the underlying ThreadPoolExecutor for native access.
* @return the underlying ThreadPoolExecutor (never {@code null})
* @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
return this.threadPoolExecutor;

public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);

public void execute(Runnable task, long startTimeout) {

public Future<?> submit(Runnable task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);



* 下面的注释意思很明显了,不多说了
* {@link TaskExecutor} implementation that executes each task <i>synchronously</i>
* in the calling thread.
* <p>Mainly intended for testing scenarios.
* <p>Execution in the calling thread does have the advantage of participating
* in it's thread context, for example the thread context class loader or the
* thread's current transaction association. That said, in many cases,
* asynchronous execution will be preferable: choose an asynchronous
* {@code TaskExecutor} instead for such scenarios.
* @author Juergen Hoeller
* @since 2.0
* @see SimpleAsyncTaskExecutor
public class SyncTaskExecutor implements TaskExecutor, Serializable {

* Executes the given {@code task} synchronously, through direct
* invocation of it's {@link Runnable#run() run()} method.
* @throws IllegalArgumentException if the given {@code task} is {@code null}
public void execute(Runnable task) {
Assert.notNull(task, "Runnable must not be null");



<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
<bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 10 task will be submitted immediately -->
<property name="corePoolSize" value="10" />
<!-- If 10 task are already submitted and treated, we allow to enlarge pool capacity to 15 (10 from core pool size + 5 from max pool size) -->
<property name="maxPoolSize" value="15" />
<!-- Number of tasks that can be placed into waiting queue -->
<property name="queueCapacity" value="10" />

<bean id="applicationEventMulticaster" class="com.migo.event.SimpleEventMulticaster">
<property name="taskExecutor" ref="syncTaskExecutor" />
<property name="asyncTaskExecutor" ref="asyncTaskExecutor" />
<bean id="taskStatsHolder" class="com.migo.event.TaskStatsHolder" />


// TaskStatsHolder.java
** Holder bean for all executed tasks.
public class TaskStatsHolder {

private Map<String, TaskStatData> tasks = new HashMap<String, TaskStatData>();

public void addNewTaskStatHolder(String key, TaskStatData value) {
tasks.put(key, value);

public TaskStatData getTaskStatHolder(String key) {
return tasks.get(key);

// TaskStatData.java
** Holder class for all statistic data about already executed tasks.
public class TaskStatData {

private String threadName;
private int executionTime;
private long startTime;
private long endTime;

public TaskStatData(String threadName, long startTime, long endTime) {
this.threadName = threadName;
this.startTime = startTime;
this.endTime = endTime;
this.executionTime = Math.round((endTime - startTime) / 1000);

public String getThreadName() {
return threadName;
public int getExecutionTime() {
return this.executionTime;
public long getStartTime() {
return this.startTime;
public long getEndTime() {
return this.endTime;

public String toString() {
StringBuilder result = new StringBuilder();
result.append("TaskStatData {thread name: ").append(this.threadName).append(", start time: ").append(new Date(this.startTime));
result.append(", end time: ").append(new Date(this.endTime)).append(", execution time: ").append(this.executionTime).append(" seconds}");
return result.toString();



// ProductChangeFailureEvent.java
* This is synchronous event dispatched when one product is modified in the backoffice.
* When product's modification fails (database, validation problem), this event is dispatched to
* all listeners. It's synchronous because we want to inform the user that some actions were done
* after the failure. Otherwise (asynchronous character of event) we shouldn't be able to
* know if something was done or not after the dispatch.
public class ProductChangeFailureEvent extends ApplicationContextEvent {

private static final long serialVersionUID = -1681426286796814792L;
public static final String TASK_KEY = "ProductChangeFailureEvent";

public ProductChangeFailureEvent(ApplicationContext source) {

// NotifMailDispatchEvent.java
* Event dispatched asynchronously every time when we want to send a notification mail.
* Notification mails to send should be stored somewhere (filesystem, database...) but in
* our case, we'll handle only one notification mail: when one product out-of-stock becomes available again.
public class NotifMailDispatchEvent extends ApplicationContextEvent implements AsyncApplicationEvent {

private static final long serialVersionUID = 9202282810553100778L;
public static final String TASK_KEY = "NotifMailDispatchEvent";

public NotifMailDispatchEvent(ApplicationContext source) {


// ProductChangeFailureListener.java
public class ProductChangeFailureListener
implements ApplicationListener<ProductChangeFailureEvent>{

public void onApplicationEvent(ProductChangeFailureEvent event) {
long start = System.currentTimeMillis();
long end = System.currentTimeMillis();
((TaskStatsHolder) event.getApplicationContext().getBean("taskStatsHolder")).addNewTaskStatHolder(ProductChangeFailureEvent.TASK_KEY, new TaskStatData(Thread.currentThread().getName(), start, end));


// NotifMailDispatchListener.java
public class NotifMailDispatchListener
implements ApplicationListener<NotifMailDispatchEvent>{

public void onApplicationEvent(NotifMailDispatchEvent event) throws InterruptedException {
long start = System.currentTimeMillis();
// sleep 5 seconds to avoid that two listeners execute at the same moment
long end = System.currentTimeMillis();
((TaskStatsHolder) event.getApplicationContext().getBean("taskStatsHolder")).addNewTaskStatHolder(NotifMailDispatchEvent.TASK_KEY, new TaskStatData(Thread.currentThread().getName(), start, end));


public class ProductController {

private ApplicationContext context;

@RequestMapping(value = "/products/change-failure")
public String changeFailure() {
try {
System.out.println("I'm modifying the product but a NullPointerException will be thrown");
String name = null;
if (name.isEmpty()) {
// show error message here
throw new RuntimeException("NullPointerException");
} catch (Exception e) {
context.publishEvent(new ProductChangeFailureEvent(context));
return "success";

@RequestMapping(value = "/products/change-success")
public String changeSuccess() {
System.out.println("Product was correctly changed");
context.publishEvent(new NotifMailDispatchEvent(context));
return "success";


public class SpringSyncAsyncEventsTest {

private WebApplicationContext wac;

public void test() {
MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
// execute both urls simultaneously

// get stats holder and check if both stats are available:
// - mail dispatching shouldn't be available because it's executed after a sleep of 5 seconds
// - product failure should be available because it's executed synchronously, almost immediately (no operations in listeners)
TaskStatsHolder statsHolder = (TaskStatsHolder) this.wac.getBean("taskStatsHolder");
TaskStatData mailStatData = statsHolder.getTaskStatHolder(NotifMailDispatchEvent.TASK_KEY);
TaskStatData productFailureData = statsHolder.getTaskStatHolder(ProductChangeFailureEvent.TASK_KEY);
assertTrue("Task for mail dispatching is executed after 5 seconds, so at this moment, it taskStatsHolder shouldn't contain it",
mailStatData == null);
assertTrue("productFailureHolder shouldn't be null but it is",
productFailureData != null);
assertTrue("Product failure listener should be executed within 0 seconds but took "+productFailureData.getExecutionTime()+" seconds",
productFailureData.getExecutionTime() == 0);
while (mailStatData == null) {
mailStatData = statsHolder.getTaskStatHolder(NotifMailDispatchEvent.TASK_KEY);

// check mail dispatching stats again, when available
assertTrue("Now task for mail dispatching should be at completed state",
mailStatData != null);
assertTrue("Task for mail dispatching should take 5 seconds but it took "+mailStatData.getExecutionTime()+" seconds",
mailStatData.getExecutionTime() == 5);
assertTrue("productFailureHolder shouldn't be null but it is",
productFailureData != null);
assertTrue("Product failure listener should be executed within 0 seconds but took "+productFailureData.getExecutionTime()+" seconds",
productFailureData.getExecutionTime() == 0);
assertTrue("Thread executing mail dispatch and product failure listeners shouldn't be the same",
assertTrue("Thread executing product failure listener ("+productFailureData.getThreadName()+") should be the same as current thread ("+Thread.currentThread().getName()+") but it wasn't",
assertTrue("Thread executing mail dispatch listener ("+mailStatData.getThreadName()+") shouldn't be the same as current thread ("+Thread.currentThread().getName()+") but it was",
// make some output to see the informations about tasks
System.out.println("Data about mail notif dispatching event: "+mailStatData);
System.out.println("Data about product failure dispatching event: "+productFailureData);

因之前整理的笔记此处SimpleEventMulticaster忘了放进去,也懒得去找了,可以通过xml定义去查看下,这个测试用例可以看出两个listener不是由同一个executor启动的,Product failure 监听器由同步执行器执行。因为他们没有做任何操作,几乎立即返回结果。关于邮件调度事件,通过休眠5秒可以得到其执行时间超过Product failure 监听器的执行时间。通过分析输出可以知道,两者在不同的线程中执行,所以由不同的执行器执行(关于这俩执行器的例子可以再搜下相关博文,其实主要还是想表达SyncTaskExecutor是在主线程里执行,而asyncTaskExecutor由线程池里管理的线程执行)。

Product was correctly changed
I'm modifying the product but a NullPointerException will be thrown
Data about mail notif dispatching event: TaskStatData {thread name: asyncTaskExecutor-1(异步线程), start time: Thu Jun 19 21:14:18 CEST 2016, end time: Thu Jun 19 21:14:23 CEST 2016, execution time: 5 seconds}
Data about product failure dispatching event: TaskStatData {thread name: main(主线程), start time: Thu Jun 19 21:14:21 CEST 2016, end time: Thu Jun 19 21:14:21 CEST 2016, execution time: 0 seconds}


666. 彩蛋

如果你对 Spring 感兴趣,欢迎加入我的知识星球一起交流。


