@Scheduled(fixedDelay = 1000*2) public void test2() { Date date = new Date(); System.out.println("tesk2 " + date); }以上表示每隔 2 秒执行一次。间隔时间都是固定的,这个不符合预期,因为要求的时间间隔是依次增加的。
如果执行的任务很多的话,前面的任务要等待后续的任务执行,那延迟就很严重了,就需要使用到多线程,开启多个线程,在《阿里Java开发手册》有一条:❝线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。❞
2.只能单线程处理任务,任务越多,延迟性越久。
@Configuration public class ThreadPoolConfig { // 线程存活时间 private static int keepAliveTime = 10; // 调用线程运行多余任务 RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); @Bean("customerTaskExecutor") public TaskExecutor taskExecutor() { // 核心线程数 int cores = Runtime.getRuntime().availableProcessors()*2; ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(cores); executor.setMaxPoolSize(cores); executor.setKeepAliveSeconds(keepAliveTime); executor.setRejectedExecutionHandler(handler); executor.setThreadNamePrefix("Custom-"); // 线程名前缀 executor.initialize(); return executor; } }其中核心线程数和最大线程数设置成一致,拒绝策略使用调用线程运行多余的任务,确保每个任务都能执行。然后添加一个异步方法.
public interface AsyncService { void executeAsync(); } @Service @Slf4j public class AsyncServiceImpl implements AsyncService { @Override @Async("customerTaskExecutor") public void executeAsync() { log.info("【开始执行任务】"); // 延迟几秒 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } log.info("【结束执行任务】"); } }使用 sleep 方法延迟,模拟请求,使用压测工具,发起 100 次请求,控制台输出如下:
2023-10-31 18:00:32.792 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.811 INFO 53009 --- [ Custom-2] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.813 INFO 53009 --- [ Custom-3] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.814 INFO 53009 --- [ Custom-4] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.816 INFO 53009 --- [ Custom-5] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.817 INFO 53009 --- [ Custom-6] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.819 INFO 53009 --- [ Custom-7] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.820 INFO 53009 --- [ Custom-8] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.821 INFO 53009 --- [ Custom-9] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.823 INFO 53009 --- [ Custom-10] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.824 INFO 53009 --- [ Custom-11] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:32.825 INFO 53009 --- [ Custom-12] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】 2023-10-31 18:00:33.296 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【结束执行任务】 2023-10-31 18:00:33.296 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【开始执行任务】采用线程池执行的任务,多个线程同时执行任务,能有效的降低了任务的延迟性。定时任务间隔固定时间从数据库 Mysql 或者 Redis 获取需要请求的数据,同时执行请求。
@Configuration public class XDelayedMessageConfig { /** * 堆代码 duidaima.com * 延迟交换机 */ public static final String DELAYED_EXCHANGE = "exchange.delayed"; /** * 重试队列 */ public static final String RETRY_QUEUE = "queue.retry"; /** * 重试routing key */ public static final String RETRY_ROUTING_KEY = "routingKey.bind.retry"; @Bean public Queue retryQueue() { return new Queue(RETRY_QUEUE,true); } /** * 定义延迟交换机 * 交换机的类型为 x-delayed-message * @return */ @Bean public CustomExchange delayedExchange() { Map<String,Object> map = new HashMap<>(); map.put("x-delayed-type","direct"); return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map); } @Bean public Binding retryQueueBinding() { return BindingBuilder.bind(retryQueue()).to(delayedExchange()).with(RETRY_ROUTING_KEY).noargs(); } }在发送端模拟重试机制,设置时间间隔 5、10、30 秒。
@Autowired private RabbitTemplate rabbitTemplate; private final int[] INTERVAL_ARRAY= {5,10,30}; @GetMapping("/retry") public String retry(int index) { if (index >= 0 && index <= 2) { send(index +",延迟" + INTERVAL_ARRAY[index] + "s",INTERVAL_ARRAY[index]); } return "ok"; } private void send(String message,Integer delayTime) { message = message + " " + DateUtil.dateFormat(new Date()); System.out.println("【发送消息】" + message); rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.RETRY_ROUTING_KEY, message, message1 -> { message1.getMessageProperties().setDelay(delayTime*1000); return message1; }); }接收端:
@RabbitListener(queues = XDelayedMessageConfig.RETRY_QUEUE) public void delayProcess(String msg, Channel channel, Message message) { System.out.println("【接收消息】" + msg + " 当前时间" + DateUtil.dateFormat(new Date())); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } int index = Integer.parseInt(msg.split(",")[0]); retry(++index); }控制台输出:
【发送消息】0,延迟5s 10:59:29 【接收消息】0,延迟5s 10:59:29 当前时间10:59:33 【发送消息】1,延迟10s 10:59:33 【接收消息】1,延迟10s 10:59:33 当前时间10:59:43 【发送消息】2,延迟30s 10:59:43 【接收消息】2,延迟30s 10:59:43 当前时间11:00:10其中 0、1、2表示重试的次数。通过延迟消息的方式,重试发送信息。每个任务作为一个消息进行消费。和定时服务相比,有以下几个优点:
2.任务不是同时执行,降低服务器的压力。
3.服务器响应异常,服务重启。
1.定时重试首先不符合变化的间隔时间,间隔的时间是固定的,重试的任务都堆积在一起请求,这样也会给服务器造成很大的压力。而空闲的时候,服务器的利用率有比较低。
2.同时请求,只能一个一个同步执行任务,同时执行的任务越多,延迟就越严重。
3.无法满足动态间隔时间的问题,而且同时请求服务器压力大。