插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编 写自己的插件。
RabbitMQ的消息模型
brew install rabbitmq
Windows端
1.安装Erlang
2.安装RabbitMQ
rabbitmq-plugins enable rabbitmq_management
访问地址查看是否安装成功:http://localhost:15672/
# rabbitmq依赖erlang 需要自己去自行下载 cd /path/to/erlang-sound-code && ./configure --prefix=/usr/local/erlang make && make install vim /etc/profile # 添加 export PATH=$PATH:/usr/local/erlang/bin source /etc/profile # 输入erl,会出现版本信息,即安装成功安装rabbitmq
#下载 abbitmq_server-3.8.16 并移动到/usr/local/下 vim /etc/profile # 添加 export PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin source /etc/profile cd /usr/local/rabbitmq_server-3.8.16/sbin # 启动 ./rabbitmq-server start
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
<!--消息队列相关依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>在application.yml添加RabbitMQ的相关配置
spring: rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /mall # rabbitmq的虚拟host username: im # rabbitmq的用户名 password: xxxxxx # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true接下来创建RabbitMQ的java配置,主要用于配置交换机、队列和绑定关系
/** * 消息队列配置 */ @Configuration public class RabbitMqConfig { /** * 机器人消息重启插件消息队列所绑定的交换机 */ @Bean CustomExchange chatPluginDirect() { //创建一个自定义交换机,可以发送延迟消息 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(), "x-delayed-message", true, false, args); } /** * 机器人消息重启插件队列 */ @Bean public Queue chatPluginQueue() { return new Queue(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getName()); } /** * 将机器人消息重启插件队列绑定到交换机 */ @Bean public Binding chatPluginBinding(CustomExchange chatPluginDirect, Queue chatPluginQueue) { return BindingBuilder .bind(chatPluginQueue) .to(chatPluginDirect) .with(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey()) .noargs(); } }创建一个消息的发出者,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
/** * 机器人重启队列发出者 */ @Component @Slf4j public class ChatQueueSender { private static Logger LOGGER = LoggerFactory.getLogger(ChatQueueSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessageToChat(Long cmid, final long delayTimes) { //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getExchange(), QueueEnum.QUEUE_RESET_MESSAGE_CANCEL.getRouteKey(), cmid, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setHeader("x-delay", delayTimes); return message; } }); } }创建一个消息的接收者,用于处理延迟插件队列中的消息。
/** * 机器人重启队列处理者 */ @Component @Slf4j @RabbitListener(queues = "im.chat.cancel") public class ChatQueueReceiver { @Autowired private ChatRestartRobotService chatRestartRobotService; @RabbitHandler public void handleOnChat(Long cmid) { // log.info("机器人会话重启"); chatRestartRobotService.restartRobot(cmid); } }最后,在对应的地方调用即可: