// 订单服务:生产者 Order order = createOrder(); // 订单生成逻辑 rabbitTemplate.convertAndSend("order_exchange", "order_key", order); System.out.println("订单已生成,发短信任务交给MQ"); // 短信服务:消费者 @RabbitListener(queues = "sms_queue") public void sendSms(Order order) { System.out.println("发送短信,订单ID:" + order.getId()); // 调用短信服务接口 }深度解析
// 用户提交秒杀请求:生产者 rabbitTemplate.convertAndSend("seckill_exchange", "seckill_key", userRequest); System.out.println("用户秒杀请求已进入队列"); // 堆代码 duidaima.com // 秒杀服务:消费者 @RabbitListener(queues = "seckill_queue") public void processSeckill(UserRequest request) { System.out.println("处理秒杀请求,用户ID:" + request.getUserId()); // 执行秒杀逻辑 }深度解析
// 订单服务:生产者 rabbitTemplate.convertAndSend("order_exchange", "order_key", order); System.out.println("订单生成消息已发送"); // 库存服务:消费者 @RabbitListener(queues = "stock_queue") public void updateStock(Order order) { System.out.println("扣减库存,订单ID:" + order.getId()); } // 支付服务:消费者 @RabbitListener(queues = "payment_queue") public void processPayment(Order order) { System.out.println("处理支付,订单ID:" + order.getId()); }深度解析
// 订单服务:生产者 rabbitTemplate.convertAndSend("order_exchange", "order_key", order); System.out.println("订单创建消息已发送"); // 库存服务:消费者 @RabbitListener(queues = "stock_queue") public void updateStock(Order order) { System.out.println("更新库存,订单ID:" + order.getId()); // 执行扣减库存逻辑 }深度解析
// 生产者:广播消息 rabbitTemplate.convertAndSend("price_update_exchange", "", priceUpdate); System.out.println("商品价格更新消息已广播"); // 消费者1:库存服务 @RabbitListener(queues = "stock_queue") public void updateStockPrice(PriceUpdate priceUpdate) { System.out.println("库存价格更新:" + priceUpdate.getProductId()); } // 消费者2:搜索服务 @RabbitListener(queues = "search_queue") public void updateSearchPrice(PriceUpdate priceUpdate) { System.out.println("搜索价格更新:" + priceUpdate.getProductId()); }深度解析
// 服务端:生产者 rabbitTemplate.convertAndSend("log_exchange", "log_key", logEntry); System.out.println("日志已发送"); // 日志分析服务:消费者 @RabbitListener(queues = "log_queue") public void processLog(LogEntry log) { System.out.println("日志处理:" + log.getMessage()); // 存储或分析逻辑 }七. 延迟任务:定时触发操作
// 生产者:发送延迟消息 rabbitTemplate.convertAndSend("delay_exchange", "delay_key", order, message -> { message.getMessageProperties().setDelay(30 * 60 * 1000); // 延迟30分钟 return message; }); System.out.println("订单取消任务已设置"); // 消费者:处理延迟消息 @RabbitListener(queues = "delay_queue") public void cancelOrder(Order order) { System.out.println("取消订单:" + order.getId()); // 取消订单逻辑 }八. 数据同步:跨系统保持数据一致
// 更新订单状态后,将消息发送到MQ Order order = updateOrderStatus(orderId, "PAID"); // 更新订单状态为已支付 rabbitTemplate.convertAndSend("order_exchange", "order_status_key", order); System.out.println("订单状态更新消息已发送:" + order.getId()); 缓存服务:消费者 @RabbitListener(queues = "cache_update_queue") public void updateCache(Order order) { System.out.println("更新缓存,订单ID:" + order.getId() + " 状态:" + order.getStatus()); // 更新缓存逻辑 cacheService.update(order.getId(), order.getStatus()); }推荐服务:消费者
@RabbitListener(queues = "recommendation_queue") public void updateRecommendation(Order order) { System.out.println("更新推荐系统,订单ID:" + order.getId() + " 状态:" + order.getStatus()); // 更新推荐服务逻辑 recommendationService.updateOrderStatus(order); }深度解析
// 定时任务生成器 @Scheduled(cron = "0 0 0 * * ?") // 每天凌晨触发 public void generateTasks() { List<Task> expiredTasks = taskService.getExpiredTasks(); for (Task task : expiredTasks) { rabbitTemplate.convertAndSend("task_exchange", "task_routing_key", task); System.out.println("任务已发送:" + task.getId()); } }订单服务:消费者
@RabbitListener(queues = "order_task_queue") public void processOrderTask(Task task) { System.out.println("处理订单任务:" + task.getId()); // 执行订单清理逻辑 orderService.cleanExpiredOrder(task); }库存服务:消费者
@RabbitListener(queues = "stock_task_queue") public void processStockTask(Task task) { System.out.println("处理库存任务:" + task.getId()); // 执行库存释放逻辑 stockService.releaseStock(task); }深度解析
任务遗漏:MQ确保任务可靠传递,防止任务丢失。
// 上传文件后,将任务写入MQ FileTask fileTask = new FileTask(); fileTask.setFileId(fileId); fileTask.setOperation("COMPRESS"); rabbitTemplate.convertAndSend("file_task_exchange", "file_task_key", fileTask); System.out.println("文件处理任务已发送,文件ID:" + fileId);
@RabbitListener(queues = "file_task_queue") public void processFileTask(FileTask fileTask) { System.out.println("处理文件任务:" + fileTask.getFileId() + " 操作:" + fileTask.getOperation()); // 模拟文件处理逻辑 if ("COMPRESS".equals(fileTask.getOperation())) { fileService.compressFile(fileTask.getFileId()); } else if ("CONVERT".equals(fileTask.getOperation())) { fileService.convertFileFormat(fileTask.getFileId()); } // 更新任务状态 taskService.updateTaskStatus(fileTask.getFileId(), "COMPLETED"); }前端轮询或回调通知
// 前端轮询文件处理状态 setInterval(() => { fetch(`/file/status?fileId=${fileId}`) .then(response => response.json()) .then(status => { if (status === "COMPLETED") { alert("文件处理完成!"); } }); }, 5000);深度解析
后台任务灵活扩展:支持多种操作逻辑,适应复杂文件处理需求。