闽公网安备 35020302035485号
.消息过滤支持在 Broker 端完成,减少网络传输开销

version: '3'
# 堆代码 duidaima.com
# 定义自定义网络
networks:
rmq_network:
driver: bridge
ipam:
config:
- subnet: 192.168.10.0/24
services:
# RocketMQ Name Server
namesrv:
image: apache/rocketmq:5.1.4
container_name: rmqnamesrv
networks:
rmq_network:
ipv4_address: 192.168.10.2
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
environment:
- JAVA_OPT_EXT=-server -Xms512m -Xmx512m
# RocketMQ Broker
broker:
image: apache/rocketmq:5.3.1
container_name: rmqbroker
networks:
rmq_network:
ipv4_address: 192.168.10.3
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./conf/broker.conf:/home/rocketmq/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/conf/broker.conf
environment:
- JAVA_OPT_EXT=-server -Xms512m -Xmx512m
depends_on:
- namesrv
# RocketMQ Dashboard
dashboard:
image: apacherocketmq/rocketmq-dashboard:1.0.0
container_name: rmqdashboard
networks:
rmq_network:
ipv4_address: 192.168.10.4
ports:
- 19080:8080
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
depends_on:
- namesrv
broker.conf 的内容如下:# broker集群名称 brokerClusterName = DefaultCluster # broker名称 brokerName = broker-a # broker编号,0表示master,大于0表示slave brokerId = 0 # 删除过期文件时间点,默认是凌晨4点 deleteWhen = 04 # 文件保留时间,默认48小时 fileReservedTime = 48 # broker角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点 brokerRole = ASYNC_MASTER # 刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 flushDiskType = ASYNC_FLUSH # nameServer地址,分号分割 namesrvAddr = namesrv:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums = 4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable = true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup = true # brokerIP1 注意:本地测试使用本机的宿主机的IP brokerIP1=192.168.1.109代码示例
@Slf4j
@Service
public class MessageProducerService {
// RocketMQ消息主题
public static final String TOPIC_UNICAST = "topic-unicast";
public static final String TOPIC_BROADCAST = "topic-broadcast";
public static final String TOPIC_RETRY = "topic-retry";
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送单播消息(点对点)
* 单播消息会被消费组中的某一个消费者消费
*/
public void sendUnicastMessage(MessageEvent message) {
rocketMQTemplate.convertAndSend(TOPIC_UNICAST, message);
log.info("Unicast message sent: {}", message);
}
/**
* 发送广播消息
* 广播消息会被所有订阅该主题的消费者消费
*/
public void sendBroadcastMessage(MessageEvent message) {
rocketMQTemplate.convertAndSend(TOPIC_BROADCAST, message);
log.info("Broadcast message sent: {}", message);
}
/**
* 发送需要重试的消息
* 使用异步发送方式,并在回调中处理发送结果
*/
public void sendRetryMessage(MessageEvent message) {
rocketMQTemplate.asyncSend(TOPIC_RETRY, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Retry message sent successfully: {}, result: {}", message, sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("Failed to send retry message: {}, error: {}", message, throwable.getMessage());
}
});
}
}
2、创建消费者 Service 类来处理消息的接收:@Slf4j
@Service
public class MessageConsumerService {
// RocketMQ消息主题
public static final String TOPIC_UNICAST = "topic-unicast";
public static final String TOPIC_BROADCAST = "topic-broadcast";
public static final String TOPIC_RETRY = "topic-retry";
/**
* 单播消息消费者
* consumeMode默认为CONCURRENTLY(并发消费)
*/
@Service
@RocketMQMessageListener(
topic = TOPIC_UNICAST,
consumerGroup = "unicast-consumer-group"
)
public class UnicastMessageListener implements RocketMQListener<MessageEvent> {
@Override
public void onMessage(MessageEvent message) {
log.info("Received unicast message: {}", message);
}
}
/**
* 广播消息消费者
* messageModel设置为BROADCASTING表示广播模式
*/
@Service
@RocketMQMessageListener(
topic = TOPIC_BROADCAST,
consumerGroup = "broadcast-consumer-group",
messageModel = MessageModel.BROADCASTING
)
public class BroadcastMessageListener implements RocketMQListener<MessageEvent> {
@Override
public void onMessage(MessageEvent message) {
log.info("Received broadcast message: {}", message);
}
}
/**
* 重试消息消费者
* 配置了重试次数和重试间隔
*/
@Service
@RocketMQMessageListener(
topic = TOPIC_RETRY,
consumerGroup = "retry-consumer-group",
maxReconsumeTimes = 3, // 最大重试次数
delayLevelWhenNextConsume = 2 // 重试间隔级别
)
public class RetryMessageListener implements RocketMQListener<MessageEvent> {
@Override
public void onMessage(MessageEvent message) {
try {
// 模拟处理失败的情况
if (message.getContent().contains("error")) {
throw new RuntimeException("Processing failed, will retry");
}
log.info("Received retry message: {}", message);
} catch (Exception e) {
log.error("Error processing message: {}, error: {}", message, e.getMessage());
throw e; // 抛出异常触发重试机制
}
}
}
}
3、创建 MessageController 来进行测试:@RestController
@RequestMapping("/api/messages")
public class MessageController {
@Autowired
private MessageProducerService producerService;
@PostMapping("/unicast")
public String sendUnicastMessage(@RequestParam String content) {
MessageEvent message = new MessageEvent()
.setId(UUID.randomUUID().toString())
.setContent(content)
.setTimestamp(System.currentTimeMillis());
producerService.sendUnicastMessage(message);
return "Unicast message sent successfully";
}
@PostMapping("/broadcast")
public String sendBroadcastMessage(@RequestParam String content) {
MessageEvent message = new MessageEvent()
.setId(UUID.randomUUID().toString())
.setContent(content)
.setTimestamp(System.currentTimeMillis());
producerService.sendBroadcastMessage(message);
return "Broadcast message sent successfully";
}
@PostMapping("/retry")
public String sendRetryMessage(@RequestParam String content) {
MessageEvent message = new MessageEvent()
.setId(UUID.randomUUID().toString())
.setContent(content)
.setTimestamp(System.currentTimeMillis());
producerService.sendRetryMessage(message);
return "Retry message sent successfully";
}
}