• RocketMQ实现延时消息的实现方法
  • 发布于 2个月前
  • 243 热度
    0 评论
一. 引言
当搭建IoT管理后台后,APP、设备、云端三端就可以实现交互。当点击APP中的控制按钮,其控制指令就可以经过云端转发到设备执行,当设备执行后将设备的状态上报到云端,APP通过轮训可以取到设备此时的状态,在控件上展示控制结果。

以上的控制流程属于即时控制,便于理解。在人们使用设备的场景中,存在一种定时/延时的场景,比如这种家里的烟机延时3分钟后再关机,那定时/延时场景是如何实现的呢?有什么较好的方式来实现?

目前有两种方式进行实现:
1.传统方式

2.使用消息中间件


二. 实现定时的方法
A.传统方法
其工作原理是在APP端设置一个未来的时间戳和运行的周期,这些表示未来时间的数据和控制指令会被放入云端数据库存储起来,云端会启动定时任务,比如每隔5s去扫描数据库数据,如果时间满足,就会将设置的控制命令下发到设备,从而实现了设备控制。

以上的方式属于传统基于数据库的定时调度方案,在分布式场景下,性能不高,定时精度不高,实现复杂。因此,接下来提出基于RocketMQ的定时消息实现定时调度任务。

B.RocketMQ定时任务
首先展示一下,RocketMQ实现延时消息的流程图:

如图所示,用户在APP设置的控制指令和定时时间会发送到RocketMQ队列中存储起来,当时间到指定时间后,作为消费者的推送平台才能把控制指令推送给设备。这种基于成熟的消息中间件的方式有着以下的优势:

1.精度高、开发门槛低
2.高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。
接下来以代码的方式介绍一下如何实现消息延时

要在SpringBoot中使用RocketMQ进行延时消息发送,需要使用RocketMQ的定时消息功能,一共4步。
1.在pom.xml添加依赖:
<dependency>    
    <groupId>org.apache.rocketmq</groupId>    
    <artifactId>rocketmq-client</artifactId>    
    <version>${rocketmq-client.version}</version>
</dependency>
<dependency>    
    <groupId>org.apache.rocketmq</groupId>    
    <artifactId>rocketmq-spring-boot-starter</artifactId>    
    <version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
2.在application.properties文件中添加以下配置:
rocketmq.name-server=your-nameserver-ip:9876
rocketmq.producer.group=your-group-name
3.创建一个RocketMQ生产者类:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {    
    @Autowired    
    private RocketMQTemplate rocketMQTemplate;    
    
    public void sendDelayMessage(String topic, Object message, long timeout, int delayLevel) {
        // 堆代码 duidaima.com
        // 封装消息
        Message<Object> msg = MessageBuilder.withPayload(message).build();
        // 发送消息
        rocketMQTemplate.syncSend(topic, msg, timeout, delayLevel);    
 }
}
如上在代码中添加了rocketMQTemplate.syncSend(topic, msg, timeout, delayLevel); 这句代码设置了delayLevel。
其延时level表默认配置如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
解释:
level有以下三种情况:level == 0,消息为非延迟消息;1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s;level > maxLevel,则level== maxLevel,例如level==20,延迟2h。

4.在需要发送延时消息的地方调用sendDelayMessage方法:
rocketMQProducer.sendDelayMessage("your-topic-name", "your-message", "timeout", 3);
这句代码设置了delayLevel=3,对应messageDelayLevel中的10s,表示消息需要延时10s才能消费。

四. 小结
综上所述,以上介绍了RocketMQ实现延时消息的实现方法,在实现延时的时候,需要根据业务提前确认好delayLevel,该方式也常用于延时关闭支付订单。给个思考,如果想自定义延时时间,比如33分钟,代码应该如何改呢?
用户评论