2.在社交平台里,消息延迟推送能够优化用户体验,让信息在合适的时间触达。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15.6</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>配置文件
spring: redis: host: localhost port: 6379 password: root enabled: true database: 7 redisson: # 单节点配置 singleServerConfig: # 连接空闲超时,单位:毫秒 idleConnectionTimeout: 10000 # 连接超时,单位:毫秒 connectTimeout: 10000 # 命令等待超时,单位:毫秒 timeout: 3000 # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。 # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryAttempts: 3 # 命令重试发送时间间隔,单位:毫秒 retryInterval: 1500 # 密码 password: root # 客户端名称 clientName: yian # 单个连接最大订阅数量 subscriptionsPerConnection: 5 # 节点地址 address: redis://localhost:6379 # 发布和订阅连接的最小空闲连接数 subscriptionConnectionMinimumIdleSize: 1 # 发布和订阅连接池大小 subscriptionConnectionPoolSize: 50 # 最小空闲连接数 connectionMinimumIdleSize: 32 # 连接池大小 connectionPoolSize: 64 # 数据库编号 database: 7 # DNS监测时间间隔,单位:毫秒 dnsMonitoringInterval: 5000 # 传输模式 transportMode: "NIO"消息模型定义
@Data @AllArgsConstructor @NoArgsConstructor public class DelayTaskMessage implements Serializable { private static final long serialVersionUID = 1L; private String messageType; private String uniqueKey; private long delayTimeMs; private String payload; private int maxRetryCount; private int retryCount; }业务消息体示例
@Data @AllArgsConstructor @NoArgsConstructor public class OrderCancelPayload implements Serializable { private static final long serialVersionUID = 2L; private String orderId; private String desc; private String time; }核心功能
@Service public class DelayTaskPublisher { private final RedissonClient redisson; public DelayTaskPublisher(RedissonClient redisson) { this.redisson = redisson; } public void publish(DelayTaskMessage message) throws IOException { RQueue<DelayTaskMessage> queue = redisson.getQueue("delay_task_queue"); RDelayedQueue<DelayTaskMessage> delayedQueue = redisson.getDelayedQueue(queue); delayedQueue.offer(message, message.getDelayTimeMs(), TimeUnit.MILLISECONDS); } }任务执行引擎
@Slf4j @Service public class DelayTaskExecutor implements Runnable { private final RedissonClient redisson; private final Map<String, DelayTaskProcessor> delayTaskProcessors; private final ExecutorService threadPool; private final AtomicBoolean running = new AtomicBoolean(false); private final DelayTaskPublisher publisher; private final ObjectMapper objectMapper; public DelayTaskExecutor(RedissonClient redisson, Map<String, DelayTaskProcessor> delayTaskProcessors, DelayTaskPublisher publisher, ObjectMapper objectMapper) { this.redisson = redisson; this.delayTaskProcessors = delayTaskProcessors; this.publisher = publisher; this.objectMapper = objectMapper; this.threadPool = Executors.newFixedThreadPool(10); } public void start() { if (!running.getAndSet(true)) { new Thread(this).start(); } } @Override public void run() { while (running.get()) { try { RQueue<DelayTaskMessage> queue = redisson.getQueue("delay_task_queue"); DelayTaskMessage message = queue.poll(); if (message != null) { threadPool.submit(() -> processMessage(message)); } } catch (Exception e) { log.error("任务处理异常", e); } } } private void processMessage(DelayTaskMessage message) { try { DelayTaskProcessor processor = delayTaskProcessors.get(message.getMessageType()); if (processor == null) { log.warn("未找到消息处理器: {}", message.getMessageType()); return; } Object payload = objectMapper.readValue( message.getPayload(), processor.getPayloadClass() ); processor.process(message.getUniqueKey(), payload); } catch (Exception e) { handleRetry(message, e); } } private void handleRetry(DelayTaskMessage message, Throwable e) { if (message.getRetryCount() < message.getMaxRetryCount()) { message.setRetryCount(message.getRetryCount() + 1); message.setDelayTimeMs(1000L * (int) Math.pow(2, message.getRetryCount())); try { publisher.publish(message); log.info("任务重试: {} (第{}次)", message.getUniqueKey(), message.getRetryCount()); } catch (IOException ex) { log.error("任务重试失败", ex); } } else { log.error("任务执行失败且超过重试次数: {}", message.getUniqueKey(), e); } } }任务处理器实现
@Slf4j @Component public class OrderCancelProcessor implements DelayTaskProcessor<OrderCancelPayload> { @Override public Class<OrderCancelPayload> getPayloadClass() { return OrderCancelPayload.class; } @Override public void process(String uniqueKey, OrderCancelPayload payload) { log.info("处理订单取消任务ID: {},任务内容: {}", uniqueKey,payload); // 实际业务逻辑 } }测试
@RestController @RequestMapping("/delay") public class DelayTaskController { // 堆代码 duidaima.com private final DelayTaskPublisher publisher; private final ObjectMapper objectMapper; public DelayTaskController(DelayTaskPublisher publisher, ObjectMapper objectMapper) { this.publisher = publisher; this.objectMapper = objectMapper; } @PostMapping("/order") public ResponseEntity<?> scheduleOrderCancel( @RequestParam String orderId, @RequestParam long delayMs ) throws IOException { OrderCancelPayload payload = new OrderCancelPayload(orderId, "一安未来",LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); DelayTaskMessage message = new DelayTaskMessage( "orderCancelProcessor", orderId, delayMs, objectMapper.writeValueAsString(payload), 3, 0 ); publisher.publish(message); return ResponseEntity.ok().build(); } }
2025-03-21 14:28:26.124 INFO 27276 --- [pool-1-thread-6] org.yian.processor.OrderCancelProcessor : 处理订单取消任务ID: TEST001,任务内容: OrderCancelPayload(orderId=TEST001, desc=一安未来, time=2025-03-21 14:28:23) 2025-03-21 14:28:27.023 INFO 27276 --- [pool-1-thread-7] org.yian.processor.OrderCancelProcessor : 处理订单取消任务ID: TEST002,任务内容: OrderCancelPayload(orderId=TEST002, desc=一安未来, time=2025-03-21 14:28:20) 2025-03-21 14:28:28.722 INFO 27276 --- [pool-1-thread-8] org.yian.processor.OrderCancelProcessor : 处理订单取消任务ID: TEST003,任务内容: OrderCancelPayload(orderId=TEST003, desc=一安未来, time=2025-03-21 14:28:18)总结