闽公网安备 35020302035485号
2.在社交平台里,消息延迟推送能够优化用户体验,让信息在合适的时间触达。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
配置文件spring:
redis:
host: localhost
port: 6379
password: root
enabled: true
database: 7
redisson:
# 单节点配置
singleServerConfig:
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 连接超时,单位:毫秒
connectTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 密码
password: root
# 客户端名称
clientName: yian
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# 节点地址
address: redis://localhost:6379
# 发布和订阅连接的最小空闲连接数
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
# 最小空闲连接数
connectionMinimumIdleSize: 32
# 连接池大小
connectionPoolSize: 64
# 数据库编号
database: 7
# DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
# 传输模式
transportMode: "NIO"
消息模型定义@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayTaskMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String messageType;
private String uniqueKey;
private long delayTimeMs;
private String payload;
private int maxRetryCount;
private int retryCount;
}
业务消息体示例@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderCancelPayload implements Serializable {
private static final long serialVersionUID = 2L;
private String orderId;
private String desc;
private String time;
}
核心功能@Service
public class DelayTaskPublisher {
private final RedissonClient redisson;
public DelayTaskPublisher(RedissonClient redisson) {
this.redisson = redisson;
}
public void publish(DelayTaskMessage message) throws IOException {
RQueue<DelayTaskMessage> queue = redisson.getQueue("delay_task_queue");
RDelayedQueue<DelayTaskMessage> delayedQueue = redisson.getDelayedQueue(queue);
delayedQueue.offer(message, message.getDelayTimeMs(), TimeUnit.MILLISECONDS);
}
}
任务执行引擎@Slf4j
@Service
public class DelayTaskExecutor implements Runnable {
private final RedissonClient redisson;
private final Map<String, DelayTaskProcessor> delayTaskProcessors;
private final ExecutorService threadPool;
private final AtomicBoolean running = new AtomicBoolean(false);
private final DelayTaskPublisher publisher;
private final ObjectMapper objectMapper;
public DelayTaskExecutor(RedissonClient redisson,
Map<String, DelayTaskProcessor> delayTaskProcessors,
DelayTaskPublisher publisher,
ObjectMapper objectMapper) {
this.redisson = redisson;
this.delayTaskProcessors = delayTaskProcessors;
this.publisher = publisher;
this.objectMapper = objectMapper;
this.threadPool = Executors.newFixedThreadPool(10);
}
public void start() {
if (!running.getAndSet(true)) {
new Thread(this).start();
}
}
@Override
public void run() {
while (running.get()) {
try {
RQueue<DelayTaskMessage> queue = redisson.getQueue("delay_task_queue");
DelayTaskMessage message = queue.poll();
if (message != null) {
threadPool.submit(() -> processMessage(message));
}
} catch (Exception e) {
log.error("任务处理异常", e);
}
}
}
private void processMessage(DelayTaskMessage message) {
try {
DelayTaskProcessor processor = delayTaskProcessors.get(message.getMessageType());
if (processor == null) {
log.warn("未找到消息处理器: {}", message.getMessageType());
return;
}
Object payload = objectMapper.readValue(
message.getPayload(),
processor.getPayloadClass()
);
processor.process(message.getUniqueKey(), payload);
} catch (Exception e) {
handleRetry(message, e);
}
}
private void handleRetry(DelayTaskMessage message, Throwable e) {
if (message.getRetryCount() < message.getMaxRetryCount()) {
message.setRetryCount(message.getRetryCount() + 1);
message.setDelayTimeMs(1000L * (int) Math.pow(2, message.getRetryCount()));
try {
publisher.publish(message);
log.info("任务重试: {} (第{}次)", message.getUniqueKey(), message.getRetryCount());
} catch (IOException ex) {
log.error("任务重试失败", ex);
}
} else {
log.error("任务执行失败且超过重试次数: {}", message.getUniqueKey(), e);
}
}
}
任务处理器实现@Slf4j
@Component
public class OrderCancelProcessor implements DelayTaskProcessor<OrderCancelPayload> {
@Override
public Class<OrderCancelPayload> getPayloadClass() {
return OrderCancelPayload.class;
}
@Override
public void process(String uniqueKey, OrderCancelPayload payload) {
log.info("处理订单取消任务ID: {},任务内容: {}", uniqueKey,payload);
// 实际业务逻辑
}
}
测试@RestController
@RequestMapping("/delay")
public class DelayTaskController {
// 堆代码 duidaima.com
private final DelayTaskPublisher publisher;
private final ObjectMapper objectMapper;
public DelayTaskController(DelayTaskPublisher publisher,
ObjectMapper objectMapper) {
this.publisher = publisher;
this.objectMapper = objectMapper;
}
@PostMapping("/order")
public ResponseEntity<?> scheduleOrderCancel(
@RequestParam String orderId,
@RequestParam long delayMs
) throws IOException {
OrderCancelPayload payload = new OrderCancelPayload(orderId, "一安未来",LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
DelayTaskMessage message = new DelayTaskMessage(
"orderCancelProcessor",
orderId,
delayMs,
objectMapper.writeValueAsString(payload),
3,
0
);
publisher.publish(message);
return ResponseEntity.ok().build();
}
}

2025-03-21 14:28:26.124 INFO 27276 --- [pool-1-thread-6] org.yian.processor.OrderCancelProcessor : 处理订单取消任务ID: TEST001,任务内容: OrderCancelPayload(orderId=TEST001, desc=一安未来, time=2025-03-21 14:28:23) 2025-03-21 14:28:27.023 INFO 27276 --- [pool-1-thread-7] org.yian.processor.OrderCancelProcessor : 处理订单取消任务ID: TEST002,任务内容: OrderCancelPayload(orderId=TEST002, desc=一安未来, time=2025-03-21 14:28:20) 2025-03-21 14:28:28.722 INFO 27276 --- [pool-1-thread-8] org.yian.processor.OrderCancelProcessor : 处理订单取消任务ID: TEST003,任务内容: OrderCancelPayload(orderId=TEST003, desc=一安未来, time=2025-03-21 14:28:18)总结