.消息过滤支持在 Broker 端完成,减少网络传输开销
version: '3' # 堆代码 duidaima.com # 定义自定义网络 networks: rmq_network: driver: bridge ipam: config: - subnet: 192.168.10.0/24 services: # RocketMQ Name Server namesrv: image: apache/rocketmq:5.1.4 container_name: rmqnamesrv networks: rmq_network: ipv4_address: 192.168.10.2 ports: - 9876:9876 volumes: - ./data/namesrv/logs:/home/rocketmq/logs command: sh mqnamesrv environment: - JAVA_OPT_EXT=-server -Xms512m -Xmx512m # RocketMQ Broker broker: image: apache/rocketmq:5.3.1 container_name: rmqbroker networks: rmq_network: ipv4_address: 192.168.10.3 ports: - 10909:10909 - 10911:10911 - 10912:10912 volumes: - ./data/broker/logs:/home/rocketmq/logs - ./data/broker/store:/home/rocketmq/store - ./conf/broker.conf:/home/rocketmq/conf/broker.conf command: sh mqbroker -c /home/rocketmq/conf/broker.conf environment: - JAVA_OPT_EXT=-server -Xms512m -Xmx512m depends_on: - namesrv # RocketMQ Dashboard dashboard: image: apacherocketmq/rocketmq-dashboard:1.0.0 container_name: rmqdashboard networks: rmq_network: ipv4_address: 192.168.10.4 ports: - 19080:8080 environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 depends_on: - namesrvbroker.conf 的内容如下:
# broker集群名称 brokerClusterName = DefaultCluster # broker名称 brokerName = broker-a # broker编号,0表示master,大于0表示slave brokerId = 0 # 删除过期文件时间点,默认是凌晨4点 deleteWhen = 04 # 文件保留时间,默认48小时 fileReservedTime = 48 # broker角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点 brokerRole = ASYNC_MASTER # 刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 flushDiskType = ASYNC_FLUSH # nameServer地址,分号分割 namesrvAddr = namesrv:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums = 4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable = true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup = true # brokerIP1 注意:本地测试使用本机的宿主机的IP brokerIP1=192.168.1.109代码示例
@Slf4j @Service public class MessageProducerService { // RocketMQ消息主题 public static final String TOPIC_UNICAST = "topic-unicast"; public static final String TOPIC_BROADCAST = "topic-broadcast"; public static final String TOPIC_RETRY = "topic-retry"; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送单播消息(点对点) * 单播消息会被消费组中的某一个消费者消费 */ public void sendUnicastMessage(MessageEvent message) { rocketMQTemplate.convertAndSend(TOPIC_UNICAST, message); log.info("Unicast message sent: {}", message); } /** * 发送广播消息 * 广播消息会被所有订阅该主题的消费者消费 */ public void sendBroadcastMessage(MessageEvent message) { rocketMQTemplate.convertAndSend(TOPIC_BROADCAST, message); log.info("Broadcast message sent: {}", message); } /** * 发送需要重试的消息 * 使用异步发送方式,并在回调中处理发送结果 */ public void sendRetryMessage(MessageEvent message) { rocketMQTemplate.asyncSend(TOPIC_RETRY, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("Retry message sent successfully: {}, result: {}", message, sendResult); } @Override public void onException(Throwable throwable) { log.error("Failed to send retry message: {}, error: {}", message, throwable.getMessage()); } }); } }2、创建消费者 Service 类来处理消息的接收:
@Slf4j @Service public class MessageConsumerService { // RocketMQ消息主题 public static final String TOPIC_UNICAST = "topic-unicast"; public static final String TOPIC_BROADCAST = "topic-broadcast"; public static final String TOPIC_RETRY = "topic-retry"; /** * 单播消息消费者 * consumeMode默认为CONCURRENTLY(并发消费) */ @Service @RocketMQMessageListener( topic = TOPIC_UNICAST, consumerGroup = "unicast-consumer-group" ) public class UnicastMessageListener implements RocketMQListener<MessageEvent> { @Override public void onMessage(MessageEvent message) { log.info("Received unicast message: {}", message); } } /** * 广播消息消费者 * messageModel设置为BROADCASTING表示广播模式 */ @Service @RocketMQMessageListener( topic = TOPIC_BROADCAST, consumerGroup = "broadcast-consumer-group", messageModel = MessageModel.BROADCASTING ) public class BroadcastMessageListener implements RocketMQListener<MessageEvent> { @Override public void onMessage(MessageEvent message) { log.info("Received broadcast message: {}", message); } } /** * 重试消息消费者 * 配置了重试次数和重试间隔 */ @Service @RocketMQMessageListener( topic = TOPIC_RETRY, consumerGroup = "retry-consumer-group", maxReconsumeTimes = 3, // 最大重试次数 delayLevelWhenNextConsume = 2 // 重试间隔级别 ) public class RetryMessageListener implements RocketMQListener<MessageEvent> { @Override public void onMessage(MessageEvent message) { try { // 模拟处理失败的情况 if (message.getContent().contains("error")) { throw new RuntimeException("Processing failed, will retry"); } log.info("Received retry message: {}", message); } catch (Exception e) { log.error("Error processing message: {}, error: {}", message, e.getMessage()); throw e; // 抛出异常触发重试机制 } } } }3、创建 MessageController 来进行测试:
@RestController @RequestMapping("/api/messages") public class MessageController { @Autowired private MessageProducerService producerService; @PostMapping("/unicast") public String sendUnicastMessage(@RequestParam String content) { MessageEvent message = new MessageEvent() .setId(UUID.randomUUID().toString()) .setContent(content) .setTimestamp(System.currentTimeMillis()); producerService.sendUnicastMessage(message); return "Unicast message sent successfully"; } @PostMapping("/broadcast") public String sendBroadcastMessage(@RequestParam String content) { MessageEvent message = new MessageEvent() .setId(UUID.randomUUID().toString()) .setContent(content) .setTimestamp(System.currentTimeMillis()); producerService.sendBroadcastMessage(message); return "Broadcast message sent successfully"; } @PostMapping("/retry") public String sendRetryMessage(@RequestParam String content) { MessageEvent message = new MessageEvent() .setId(UUID.randomUUID().toString()) .setContent(content) .setTimestamp(System.currentTimeMillis()); producerService.sendRetryMessage(message); return "Retry message sent successfully"; } }