闽公网安备 35020302035485号
插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编 写自己的插件。
RabbitMQ的消息模型


brew install rabbitmq
Windows端
1.安装Erlang
2.安装RabbitMQ

rabbitmq-plugins enable rabbitmq_management
访问地址查看是否安装成功:http://localhost:15672/
# rabbitmq依赖erlang 需要自己去自行下载 cd /path/to/erlang-sound-code && ./configure --prefix=/usr/local/erlang make && make install vim /etc/profile # 添加 export PATH=$PATH:/usr/local/erlang/bin source /etc/profile # 输入erl,会出现版本信息,即安装成功安装rabbitmq
#下载 abbitmq_server-3.8.16 并移动到/usr/local/下 vim /etc/profile # 添加 export PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin source /etc/profile cd /usr/local/rabbitmq_server-3.8.16/sbin # 启动 ./rabbitmq-server start

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml添加RabbitMQ的相关配置spring:
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: im # rabbitmq的用户名
password: xxxxxx # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true
接下来创建RabbitMQ的java配置,主要用于配置交换机、队列和绑定关系/**
* 消息队列配置
*/
@Configuration
public class RabbitMqConfig {
/**
* 机器人消息重启插件消息队列所绑定的交换机
*/
@Bean
CustomExchange chatPluginDirect() {
//创建一个自定义交换机,可以发送延迟消息
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(), "x-delayed-message", true, false, args);
}
/**
* 机器人消息重启插件队列
*/
@Bean
public Queue chatPluginQueue() {
return new Queue(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getName());
}
/**
* 将机器人消息重启插件队列绑定到交换机
*/
@Bean
public Binding chatPluginBinding(CustomExchange chatPluginDirect, Queue chatPluginQueue) {
return BindingBuilder
.bind(chatPluginQueue)
.to(chatPluginDirect)
.with(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey())
.noargs();
}
}
创建一个消息的发出者,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间/**
* 机器人重启队列发出者
*/
@Component
@Slf4j
public class ChatQueueSender {
private static Logger LOGGER = LoggerFactory.getLogger(ChatQueueSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessageToChat(Long cmid, final long delayTimes) {
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(), QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey(), cmid, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setHeader("x-delay", delayTimes);
return message;
}
});
}
}
创建一个消息的接收者,用于处理延迟插件队列中的消息。/**
* 机器人重启队列处理者
*/
@Component
@Slf4j
@RabbitListener(queues = "im.chat.cancel")
public class ChatQueueReceiver {
@Autowired
private ChatRestartRobotService chatRestartRobotService;
@RabbitHandler
public void handleOnChat(Long cmid) {
// log.info("机器人会话重启");
chatRestartRobotService.restartRobot(cmid);
}
}
最后,在对应的地方调用即可: