• 如何实现高效可靠的分布式延时任务?
  • 发布于 1个月前
  • 82 热度
    0 评论
前言
在分布式系统蓬勃发展的当下,延时任务处理已成为众多业务场景中不可或缺的关键环节。
1.以电商领域为例,订单超时未支付时需自动取消,避免库存长时间占用;

2.在社交平台里,消息延迟推送能够优化用户体验,让信息在合适的时间触达。


然而实现高效可靠的分布式延时任务并非易事,需要综合考量系统架构、消息队列性能、任务重试机制等诸多因素。本文聚焦于这一实际需求,基于Spring Boot和Redisson框架,精心打造一个开箱即用的分布式延时任务组件。该组件不仅具备基本的延时任务处理能力,还支持灵活的消息类型扩展,旨在为开发者提供一套便捷、高效且可定制的解决方案,助力其在分布式项目中轻松应对各类延时任务场景。

实现
依赖引入
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.15.6</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>
配置文件
spring:
  redis:
    host: localhost
    port: 6379
    password: root
    enabled: true
    database: 7
    redisson:
      # 单节点配置
      singleServerConfig:
        # 连接空闲超时,单位:毫秒
        idleConnectionTimeout: 10000
        # 连接超时,单位:毫秒
        connectTimeout: 10000
        # 命令等待超时,单位:毫秒
        timeout: 3000
        # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
        # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
        retryAttempts: 3
        # 命令重试发送时间间隔,单位:毫秒
        retryInterval: 1500
        # 密码
        password: root
        # 客户端名称
        clientName: yian
        # 单个连接最大订阅数量
        subscriptionsPerConnection: 5
        # 节点地址
        address: redis://localhost:6379
        # 发布和订阅连接的最小空闲连接数
        subscriptionConnectionMinimumIdleSize: 1
        # 发布和订阅连接池大小
        subscriptionConnectionPoolSize: 50
        # 最小空闲连接数
        connectionMinimumIdleSize: 32
        # 连接池大小
        connectionPoolSize: 64
        # 数据库编号
        database: 7
        # DNS监测时间间隔,单位:毫秒
        dnsMonitoringInterval: 5000
        # 传输模式
      transportMode: "NIO"
消息模型定义
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayTaskMessage implements Serializable {
    private static final long serialVersionUID = 1L;
    private String messageType;
    private String uniqueKey;
    private long delayTimeMs;
    private String payload;
    private int maxRetryCount;
    private int retryCount;
}
业务消息体示例
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderCancelPayload implements Serializable {
    private static final long serialVersionUID = 2L;
    private String orderId;
    private String desc;
    private String time;
}
核心功能
任务发布服务
@Service
public class DelayTaskPublisher {

    private final RedissonClient redisson;

    public DelayTaskPublisher(RedissonClient redisson) {
        this.redisson = redisson;
    }

    public void publish(DelayTaskMessage message) throws IOException {
        RQueue<DelayTaskMessage> queue = redisson.getQueue("delay_task_queue");
        RDelayedQueue<DelayTaskMessage> delayedQueue = redisson.getDelayedQueue(queue);

        delayedQueue.offer(message, message.getDelayTimeMs(), TimeUnit.MILLISECONDS);
    }
}
任务执行引擎
@Slf4j
@Service
public class DelayTaskExecutor implements Runnable {

    private final RedissonClient redisson;
    private final Map<String, DelayTaskProcessor> delayTaskProcessors;
    private final ExecutorService threadPool;
    private final AtomicBoolean running = new AtomicBoolean(false);

    private final DelayTaskPublisher publisher;
    private final ObjectMapper objectMapper;

    public DelayTaskExecutor(RedissonClient redisson,
                             Map<String, DelayTaskProcessor> delayTaskProcessors,
                             DelayTaskPublisher publisher,
                             ObjectMapper objectMapper) {
        this.redisson = redisson;
        this.delayTaskProcessors = delayTaskProcessors;
        this.publisher = publisher;
        this.objectMapper = objectMapper;
        this.threadPool = Executors.newFixedThreadPool(10);
    }

    public void start() {
        if (!running.getAndSet(true)) {
            new Thread(this).start();
        }
    }

    @Override
    public void run() {
        while (running.get()) {
            try {
                RQueue<DelayTaskMessage> queue = redisson.getQueue("delay_task_queue");

                DelayTaskMessage message = queue.poll();
                if (message != null) {
                    threadPool.submit(() -> processMessage(message));
                }
            } catch (Exception e) {
                log.error("任务处理异常", e);
            }
        }
    }

    private void processMessage(DelayTaskMessage message) {
        try {
            DelayTaskProcessor processor = delayTaskProcessors.get(message.getMessageType());
            if (processor == null) {
                log.warn("未找到消息处理器: {}", message.getMessageType());
                return;
            }

            Object payload = objectMapper.readValue(
                    message.getPayload(),
                    processor.getPayloadClass()
            );

            processor.process(message.getUniqueKey(), payload);
        } catch (Exception e) {
            handleRetry(message, e);
        }
    }

    private void handleRetry(DelayTaskMessage message, Throwable e) {
        if (message.getRetryCount() < message.getMaxRetryCount()) {
            message.setRetryCount(message.getRetryCount() + 1);
            message.setDelayTimeMs(1000L * (int) Math.pow(2, message.getRetryCount()));
            try {
                publisher.publish(message);
                log.info("任务重试: {} (第{}次)", message.getUniqueKey(), message.getRetryCount());
            } catch (IOException ex) {
                log.error("任务重试失败", ex);
            }
        } else {
            log.error("任务执行失败且超过重试次数: {}", message.getUniqueKey(), e);
        }
    }
}
任务处理器实现
@Slf4j
@Component
public class OrderCancelProcessor implements DelayTaskProcessor<OrderCancelPayload> {

    @Override
    public Class<OrderCancelPayload> getPayloadClass() {
        return OrderCancelPayload.class;
    }

    @Override
    public void process(String uniqueKey, OrderCancelPayload payload) {
        log.info("处理订单取消任务ID: {},任务内容: {}", uniqueKey,payload);
        // 实际业务逻辑
    }
}
测试
@RestController
@RequestMapping("/delay")
public class DelayTaskController {
    // 堆代码 duidaima.com
    private final DelayTaskPublisher publisher;
    private final ObjectMapper objectMapper;

    public DelayTaskController(DelayTaskPublisher publisher,
                               ObjectMapper objectMapper) {
        this.publisher = publisher;
        this.objectMapper = objectMapper;
    }

    @PostMapping("/order")
    public ResponseEntity<?> scheduleOrderCancel(
            @RequestParam String orderId,
            @RequestParam long delayMs
    ) throws IOException {
        OrderCancelPayload payload = new OrderCancelPayload(orderId, "一安未来",LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

        DelayTaskMessage message = new DelayTaskMessage(
                "orderCancelProcessor",
                orderId,
                delayMs,
                objectMapper.writeValueAsString(payload),
                3,
                0
        );

        publisher.publish(message);
        return ResponseEntity.ok().build();
    }
}

2025-03-21 14:28:26.124  INFO 27276 --- [pool-1-thread-6] org.yian.processor.OrderCancelProcessor  : 处理订单取消任务ID: TEST001,任务内容: OrderCancelPayload(orderId=TEST001, desc=一安未来, time=2025-03-21 14:28:23)
2025-03-21 14:28:27.023  INFO 27276 --- [pool-1-thread-7] org.yian.processor.OrderCancelProcessor  : 处理订单取消任务ID: TEST002,任务内容: OrderCancelPayload(orderId=TEST002, desc=一安未来, time=2025-03-21 14:28:20)
2025-03-21 14:28:28.722  INFO 27276 --- [pool-1-thread-8] org.yian.processor.OrderCancelProcessor  : 处理订单取消任务ID: TEST003,任务内容: OrderCancelPayload(orderId=TEST003, desc=一安未来, time=2025-03-21 14:28:18)
总结
通过测试验证,该组件能够成功处理订单取消等延时任务,开发者可以根据自身业务需求,基于此组件快速定制和扩展不同类型的延时任务,为分布式系统的高效运行提供有力支持。
用户评论