 闽公网安备 35020302035485号
                
                闽公网安备 35020302035485号
                .消息过滤支持在 Broker 端完成,减少网络传输开销
	

version: '3'
# 堆代码 duidaima.com
# 定义自定义网络
networks:
  rmq_network:
    driver: bridge
    ipam:
      config:
        - subnet: 192.168.10.0/24
services:
  # RocketMQ Name Server
  namesrv:
    image: apache/rocketmq:5.1.4
    container_name: rmqnamesrv
    networks:
      rmq_network:
        ipv4_address: 192.168.10.2
    ports:
      - 9876:9876
    volumes:
      - ./data/namesrv/logs:/home/rocketmq/logs
    command: sh mqnamesrv
    environment:
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m
  # RocketMQ Broker
  broker:
    image: apache/rocketmq:5.3.1
    container_name: rmqbroker
    networks:
      rmq_network:
        ipv4_address: 192.168.10.3
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./data/broker/logs:/home/rocketmq/logs
      - ./data/broker/store:/home/rocketmq/store
      - ./conf/broker.conf:/home/rocketmq/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
    environment:
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m
    depends_on:
      - namesrv
  # RocketMQ Dashboard 
  dashboard:
    image: apacherocketmq/rocketmq-dashboard:1.0.0
    container_name: rmqdashboard
    networks:
      rmq_network:
        ipv4_address: 192.168.10.4
    ports:
      - 19080:8080
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
    depends_on:
      - namesrv
broker.conf 的内容如下:# broker集群名称 brokerClusterName = DefaultCluster # broker名称 brokerName = broker-a # broker编号,0表示master,大于0表示slave brokerId = 0 # 删除过期文件时间点,默认是凌晨4点 deleteWhen = 04 # 文件保留时间,默认48小时 fileReservedTime = 48 # broker角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点 brokerRole = ASYNC_MASTER # 刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 flushDiskType = ASYNC_FLUSH # nameServer地址,分号分割 namesrvAddr = namesrv:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums = 4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable = true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup = true # brokerIP1 注意:本地测试使用本机的宿主机的IP brokerIP1=192.168.1.109代码示例
@Slf4j
@Service
public class MessageProducerService {
    // RocketMQ消息主题
    public static final String TOPIC_UNICAST = "topic-unicast";
    public static final String TOPIC_BROADCAST = "topic-broadcast";
    public static final String TOPIC_RETRY = "topic-retry";
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送单播消息(点对点)
     * 单播消息会被消费组中的某一个消费者消费
     */
    public void sendUnicastMessage(MessageEvent message) {
        rocketMQTemplate.convertAndSend(TOPIC_UNICAST, message);
        log.info("Unicast message sent: {}", message);
    }
    /**
     * 发送广播消息
     * 广播消息会被所有订阅该主题的消费者消费
     */
    public void sendBroadcastMessage(MessageEvent message) {
        rocketMQTemplate.convertAndSend(TOPIC_BROADCAST, message);
        log.info("Broadcast message sent: {}", message);
    }
    /**
     * 发送需要重试的消息
     * 使用异步发送方式,并在回调中处理发送结果
     */
    public void sendRetryMessage(MessageEvent message) {
        rocketMQTemplate.asyncSend(TOPIC_RETRY, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Retry message sent successfully: {}, result: {}", message, sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                log.error("Failed to send retry message: {}, error: {}", message, throwable.getMessage());
            }
        });
    }
}
2、创建消费者 Service 类来处理消息的接收:@Slf4j
@Service
public class MessageConsumerService {
    // RocketMQ消息主题
    public static final String TOPIC_UNICAST = "topic-unicast";
    public static final String TOPIC_BROADCAST = "topic-broadcast";
    public static final String TOPIC_RETRY = "topic-retry";
    /**
     * 单播消息消费者
     * consumeMode默认为CONCURRENTLY(并发消费)
     */
    @Service
    @RocketMQMessageListener(
            topic = TOPIC_UNICAST,
            consumerGroup = "unicast-consumer-group"
    )
    public class UnicastMessageListener implements RocketMQListener<MessageEvent> {
        @Override
        public void onMessage(MessageEvent message) {
            log.info("Received unicast message: {}", message);
        }
    }
    /**
     * 广播消息消费者
     * messageModel设置为BROADCASTING表示广播模式
     */
    @Service
    @RocketMQMessageListener(
            topic = TOPIC_BROADCAST,
            consumerGroup = "broadcast-consumer-group",
            messageModel = MessageModel.BROADCASTING
    )
    public class BroadcastMessageListener implements RocketMQListener<MessageEvent> {
        @Override
        public void onMessage(MessageEvent message) {
            log.info("Received broadcast message: {}", message);
        }
    }
    /**
     * 重试消息消费者
     * 配置了重试次数和重试间隔
     */
    @Service
    @RocketMQMessageListener(
            topic = TOPIC_RETRY,
            consumerGroup = "retry-consumer-group",
            maxReconsumeTimes = 3,    // 最大重试次数
            delayLevelWhenNextConsume = 2  // 重试间隔级别
    )
    public class RetryMessageListener implements RocketMQListener<MessageEvent> {
        @Override
        public void onMessage(MessageEvent message) {
            try {
                // 模拟处理失败的情况
                if (message.getContent().contains("error")) {
                    throw new RuntimeException("Processing failed, will retry");
                }
                log.info("Received retry message: {}", message);
            } catch (Exception e) {
                log.error("Error processing message: {}, error: {}", message, e.getMessage());
                throw e; // 抛出异常触发重试机制
            }
        }
    }
}
3、创建 MessageController 来进行测试:@RestController
@RequestMapping("/api/messages")
public class MessageController {
    @Autowired
    private MessageProducerService producerService;
    @PostMapping("/unicast")
    public String sendUnicastMessage(@RequestParam String content) {
        MessageEvent message = new MessageEvent()
                .setId(UUID.randomUUID().toString())
                .setContent(content)
                .setTimestamp(System.currentTimeMillis());
        producerService.sendUnicastMessage(message);
        return "Unicast message sent successfully";
    }
    @PostMapping("/broadcast")
    public String sendBroadcastMessage(@RequestParam String content) {
        MessageEvent message = new MessageEvent()
                .setId(UUID.randomUUID().toString())
                .setContent(content)
                .setTimestamp(System.currentTimeMillis());
        producerService.sendBroadcastMessage(message);
        return "Broadcast message sent successfully";
    }
    @PostMapping("/retry")
    public String sendRetryMessage(@RequestParam String content) {
        MessageEvent message = new MessageEvent()
                .setId(UUID.randomUUID().toString())
                .setContent(content)
                .setTimestamp(System.currentTimeMillis());
        producerService.sendRetryMessage(message);
        return "Retry message sent successfully";
    }
}