public class Send { private final static String QUEUE_NAME = "queue1"; public static void main(String[] args) { // 堆代码 duidaima.com // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、创建连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello world"; // 4、发送消息到指定队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } catch (TimeoutException | IOException e) { e.printStackTrace(); } finally { // 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }Customer
public class Recv { private final static String QUEUE_NAME = "queue1"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setVirtualHost("/"); // 2、获取 Connection和 Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }观察可视化界面,会看到消息先会被写入到队列中,随后又被消费者消费了。
public class Productor { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 消息内容 String message = "hello fanout mode"; // 指定路由key String routeKey = ""; String type = "fanout"; // 3、声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, type); // 4、声明队列 channel.queueDeclare("queue1", true, false, false, null); channel.queueDeclare("queue2", true, false, false, null); channel.queueDeclare("queue3", true, false, false, null); channel.queueDeclare("queue4", true, false, false, null); // 5、绑定 channel 与 queue channel.queueBind("queue1", EXCHANGE_NAME, routeKey); channel.queueBind("queue2", EXCHANGE_NAME, routeKey); channel.queueBind("queue3", EXCHANGE_NAME, routeKey); channel.queueBind("queue4", EXCHANGE_NAME, routeKey); // 6、发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); }finally { // 关闭通道和连接...... } } }Customer
public class Customer { private static Runnable runnable = new Runnable() { @Override public void run() { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(delivery.getEnvelope().getDeliveryTag()); System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }); System.out.println(queueName + ":开始接收消息"); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道和连接...... } } }; public static void main(String[] args) throws IOException, TimeoutException { // 创建线程分别从四个队列中获取消息 new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); new Thread(runnable, "queue3").start(); new Thread(runnable, "queue4").start(); } }
执行完 Productor 发现四个队列中分别增加了一条消息,而执行完 Customer 后四个队列中的消息都被消费者消费了。
public class Productor { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 消息内容 String message = "hello direct mode"; // 指定路由key String routeKey = "email"; String type = "direct"; // 3、声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, type); // 4、声明队列 channel.queueDeclare("queue1", true, false, false, null); channel.queueDeclare("queue2", true, false, false, null); channel.queueDeclare("queue3", true, false, false, null); // 5、绑定 channel 与 queue channel.queueBind("queue1", EXCHANGE_NAME, "email"); channel.queueBind("queue2", EXCHANGE_NAME, "sms"); channel.queueBind("queue3", EXCHANGE_NAME, "vx"); // 6、发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道和连接...... } } }可以通过可视化页面查看,各队列绑定的 routing_key
public class Productor { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 消息内容 String message = "hello topic mode"; // 指定路由key String routeKey = "com.order.test.xxx"; String type = "topic"; // 3、声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, type); // 4、声明队列 channel.queueDeclare("queue5",true,false,false,null); channel.queueDeclare("queue6",true,false,false,null); // 5、绑定 channel 与 queue channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#"); channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*"); // 6、发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道和连接...... } } }执行完 Productor 后,通过可视化页面查看到,queue 绑定的 routing_key
public class Productor { public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 3、向 Queue1 发布20个消息 for (int i = 0; i < 20; i++) { String msg = "feiyangyang: " + i; channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8)); } System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道和连接...... } } }Worker1
public class Worker1 { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }); System.out.println("Worker1 开始接收消息"); System.in.read(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道和连接...... } } }Worker2 与 Worker1 相同
Worker1 开始接收消息 Worker1:收到消息是:feiyangyang: 0 Worker1:收到消息是:feiyangyang: 2 Worker1:收到消息是:feiyangyang: 4 Worker1:收到消息是:feiyangyang: 6 Worker1:收到消息是:feiyangyang: 8 Worker1:收到消息是:feiyangyang: 10 Worker1:收到消息是:feiyangyang: 12 Worker1:收到消息是:feiyangyang: 14 Worker1:收到消息是:feiyangyang: 16 Worker1:收到消息是:feiyangyang: 18 Worker2 开始接收消息 Worker2:收到消息是:feiyangyang: 1 Worker2:收到消息是:feiyangyang: 3 Worker2:收到消息是:feiyangyang: 5 Worker2:收到消息是:feiyangyang: 7 Worker2:收到消息是:feiyangyang: 9 Worker2:收到消息是:feiyangyang: 11 Worker2:收到消息是:feiyangyang: 13 Worker2:收到消息是:feiyangyang: 15 Worker2:收到消息是:feiyangyang: 17 Worker2:收到消息是:feiyangyang: 19可以看出,轮询分发模式就是将消息均衡的分配所有消费者。
// Channel 使用 Qos 机制 finalChannel.basicQos(1); finalChannel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); try { Thread.sleep(1000); // 改成手动应答 finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } });上述实例相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息,将 Worker1 的 sleep 时间设置为 1s,将 Worker2 的 sleep 时间设置为 2s,查看消息分发结果
Worker1 开始接收消息 Worker1:收到消息是:feiyangyang: 0 Worker1:收到消息是:feiyangyang: 2 Worker1:收到消息是:feiyangyang: 4 Worker1:收到消息是:feiyangyang: 5 Worker1:收到消息是:feiyangyang: 7 Worker1:收到消息是:feiyangyang: 8 Worker1:收到消息是:feiyangyang: 10 Worker1:收到消息是:feiyangyang: 11 Worker1:收到消息是:feiyangyang: 13 Worker1:收到消息是:feiyangyang: 14 Worker1:收到消息是:feiyangyang: 16 Worker1:收到消息是:feiyangyang: 17 Worker1:收到消息是:feiyangyang: 19 Worker2 开始接收消息 Worker2:收到消息是:feiyangyang: 1 Worker2:收到消息是:feiyangyang: 3 Worker2:收到消息是:feiyangyang: 6 Worker2:收到消息是:feiyangyang: 9 Worker2:收到消息是:feiyangyang: 12 Worker2:收到消息是:feiyangyang: 15 Worker2:收到消息是:feiyangyang: 18当使用 Work 公平分发模式时,要设置消费者为手动应答,并且开启 Qos 机制。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);4.2 持久化
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);设置消息持久
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());将消息标记为持久性并不能完全保证消息不会丢失,当 rabbitmq 接收到消息并且还没保存时,仍然有很短的时间窗口会使消息丢失,如果需要更强的保证,可以使用发布者确认机制。