使用RabbitMQ原生的API来操作消息的生产和消费。首先先介绍一个简单的一个消息推送到接收的流程,提供一个简单的图:
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是RabbitMQ的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键是有规则的。 简单地介绍下规则: *(星号) 用来表示一个单词 (必须出现的) #(井号) 用来表示任意数量(零个或多个)单词 通配的路由键是跟队列进行绑定的,举个小例子: 队列Q1路由键为*.TT.*,队列Q2路由键为TT.#; 如果一条消息携带的路由值为A.TT.B,那么队列Q1将会收到; 如果一条消息携带的路由值为TT.AA.BB,那么队列Q2将会收到。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>二、生产消息
public class Producer { public static final String HOST = "148.70.153.63"; public static final String USER_NAME = "libai"; public static void main(String[] args) throws IOException, TimeoutException { Producer producer = new Producer(); // 获取connectionFactory ConnectionFactory connectionFactory = producer.getConnectionFactory(); // 创建连接和通道 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "amq.direct", queueName = "apiQueue1", routingKey = "RabbitMQ"; // 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); // 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数 channel.queueDeclare(queueName, true, false, false, null); // 将队列和交换机绑定并指定路由键 channel.queueBind(queueName, exchangeName, routingKey); // 发送消息 String msg = "Hello RabbitMQ!"; // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节 channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); // 释放连接 channel.close(); connection.close(); } /** * 获取连接工厂 * 堆代码 duidaima.com * @return */ public ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(AMQP.PROTOCOL.PORT); connectionFactory.setUsername(USER_NAME); connectionFactory.setPassword(System.getProperty("password")); connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST); return connectionFactory; } }1.创建连接Connection和通道Channel。
@Slf4j public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Producer producer = new Producer(); // 获取connectionFactory ConnectionFactory connectionFactory = producer.getConnectionFactory(); // 创建连接和通道 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 消费消息-客户端主动拉取模式、自动ACK确认 // 性能较差,每次都要创建、关闭连接和通道。 GetResponse response = channel.basicGet("apiQueue1", true); String msg = Optional.ofNullable(response).map(GetResponse::getBody).map(String::new).orElse(null); log.info("消费消息:[{}]", msg); channel.close(); connection.close(); } }这种模式在创建连接和通道后,主动从server中拉取消息,效率、性能比较低下。
@Slf4j public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Producer producer = new Producer(); // 获取connectionFactory ConnectionFactory connectionFactory = producer.getConnectionFactory(); // 创建连接和通道 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 消费消息-server推送模式,创建连接和通道后,等待队列推送消息然后进行消费 // 不能关闭连接和通道 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long now = System.currentTimeMillis(); if (now % 3 == 0) { log.info("手动确认消费消息:[{}]", new String(body)); // 消息唯一标记、是否确认多条(true则批量确认小于当前标记号的所有消息) channel.basicAck(envelope.getDeliveryTag(), false); } else if (now % 3 == 1) { log.info("basicNack:[手动拒绝消息,重回队列]"); // 消息唯一标记、是否拒绝多条(true则批量拒绝小于当前标记号的所有消息)、是否重回队列 // 重回队列后仍旧会被当前消费者再次消费 channel.basicNack(envelope.getDeliveryTag(), false, true); } else if (now % 3 == 2) { log.info("basicReject:[手动拒绝消息,重回队列]"); // 只能拒绝单条消息,消息唯一标记、是否重回队列 // 重回队列后仍旧会被当前消费者再次消费 channel.basicReject(envelope.getDeliveryTag(), true); } } }; // 进入消费状态、手动ACK channel.basicConsume("apiQueue1", false, consumer); } }这种模式在创建连接和通道后就进入等待消费的状态,等待队列推送消息然后进行消费。 手动ACK模式,在成功消费消息后需要手动ACK确认,也可以拒绝当前消息并指定是否重回队列中被再次消费。