闽公网安备 35020302035485号
Spring事务监听机制---使用@TransactionalEventListener处理数据库事务提交成功后再执行操作
public interface StudentEventRegisterService {
/**
* 学生注册、发布事件
*/
void register();
}
2.在实现类发布事件@Service
public class StudentEventRegisterServiceImpl implements StudentEventRegisterService{
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void register() {
Student student = new Student();
student.setId(1L);
student.setName("张三");
applicationEventPublisher.publishEvent(student);
System.out.println("学生注册事件发送结束");
}
}
3.监听事件@Component
public class StudentEventListener {
@EventListener(condition = "#student.id!=null")
public void handleEvent(Student student) {
System.out.println("接收到的学生对象:" + student);
}
}
事件监听开启异步@Configuration
@EnableAsync
public class AsyncEventConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
return Executors.newFixedThreadPool(1);
}
}
2. 监听事件并开启异步@Component
public class StudentEventListener {
@EventListener(condition = "#student.id!=null")
@Async
public void handleEvent(Student student) {
System.out.println("接收到的学生对象:" + student);
}
}
注意:这种方式在阿里规约里已经不允许了,此处仅为演示使用。Positive example 1:
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
Positive example 2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown
Positive example 3:
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);
优化后开启异步的配置@Configuration
@EnableAsync
public class AsyncEventConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
taskExecutor.setMaxPoolSize(2*Runtime.getRuntime().availableProcessors());
taskExecutor.setQueueCapacity(1024);
taskExecutor.initialize();
return taskExecutor;
}
}
@TransactionalEventListener使用@Transaction
public void saveUser(User u) {
//保存用户信息
userDao.save(u);
//触发保存用户事件
applicationContext.publishEvent(new SaveUserEvent(u.getId()));
}
2.事件监听@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
void onSaveUserEvent(SaveUserEvent event) {
Integer id = event.getEventData();
User u = userDao.getUserById(id);
String phone = u.getPhoneNumber();
MessageUtils.sendMessage(phone);
}
这样,只有当前事务提交之后,才会执行事件监听器的方法。其中参数phase默认为AFTER_COMMIT,共有四个枚举: