import java.util.concurrent.*; public class OrderCancelService { private static final DelayQueue<OrderTask> delayQueue = new DelayQueue<>(); public static void main(String[] args) throws InterruptedException { // 启动消费者线程 new Thread(() -> { while (true) { try { OrderTask task = delayQueue.take(); // 获取到期任务 System.out.println("取消订单:" + task.getOrderId()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }).start(); // 堆代码 duidaima.com // 模拟订单创建 for (int i = 1; i <= 5; i++) { delayQueue.put(new OrderTask(i, System.currentTimeMillis() + 5000)); // 5秒后取消 System.out.println("订单" + i + "已创建"); } } static class OrderTask implements Delayed { private final long expireTime; private final int orderId; public OrderTask(int orderId, long expireTime) { this.orderId = orderId; this.expireTime = expireTime; } public int getOrderId() { return orderId; } @Override public long getDelay(TimeUnit unit) { return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.expireTime, ((OrderTask) o).expireTime); } } }优点:
2.随着订单量增加,内存占用会显著上升。
public void cancelExpiredOrders() { String sql = "UPDATE orders SET status = 'CANCELLED' WHERE status = 'PENDING' AND create_time < ?"; try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setTimestamp(1, new Timestamp(System.currentTimeMillis() - 30 * 60 * 1000)); // 30分钟未支付取消 int affectedRows = ps.executeUpdate(); System.out.println("取消订单数量:" + affectedRows); } catch (SQLException e) { e.printStackTrace(); } }优点:
2.结合分表分库策略,减少单表压力。
public void addOrderToQueue(String orderId, long expireTime) { jedis.zadd("order_delay_queue", expireTime, orderId); } public void processExpiredOrders() { long now = System.currentTimeMillis(); Set<String> expiredOrders = jedis.zrangeByScore("order_delay_queue", 0, now); for (String orderId : expiredOrders) { System.out.println("取消订单:" + orderId); jedis.zrem("order_delay_queue", orderId); // 删除已处理的订单 } }优点:
2.需要额外处理 Redis 宕机或数据丢失的问题。
public void setOrderWithExpiration(String orderId, long expireSeconds) { jedis.setex("order:" + orderId, expireSeconds, "PENDING"); } 订阅 Redis 的过期事件: public void subscribeToExpirationEvents() { Jedis jedis = new Jedis("localhost"); jedis.psubscribe(new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String message) { if (channel.equals("__keyevent@0__:expired")) { System.out.println("接收到过期事件,取消订单:" + message); // 执行取消订单的业务逻辑 } } }, "__keyevent@0__:expired"); // 订阅过期事件 }优点:
notify-keyspace-events Ex五. 基于消息队列(如RabbitMQ)
public void sendOrderToDelayQueue(String orderId, long delay) { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); ConnectionFactory factory = new ConnectionFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args); channel.queueDeclare("delay_queue", true, false, false, null); channel.queueBind("delay_queue", "delayed_exchange", "order.cancel"); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(Map.of("x-delay", delay)) // 延迟时间 .build(); channel.basicPublish("delayed_exchange", "order.cancel", props, orderId.getBytes()); } catch (Exception e) { e.printStackTrace(); } }优点:
2.需要处理队列堆积的问题。
@Scheduled(cron = "0 */5 * * * ?") public void scanAndCancelOrders() { System.out.println("开始扫描并取消过期订单"); // 这里调用数据库更新逻辑 }优点:
2.框架本身较复杂。
DataStream<OrderEvent> orderStream = env.fromCollection(orderEvents); orderStream .keyBy(OrderEvent::getOrderId) .process(new KeyedProcessFunction<String, OrderEvent, Void>() { @Override public void processElement(OrderEvent event, Context ctx, Collector<Void> out) throws Exception { // 注册一个定时器 ctx.timerService().registerProcessingTimeTimer(event.getTimestamp() + 30000); // 30秒超时 } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Void> out) throws Exception { // 定时器触发,执行订单取消逻辑 System.out.println("订单超时取消,订单ID:" + ctx.getCurrentKey()); } });优点:
2.对运维要求较高。