var ackQueue = new AckQueue(new AckQueueOptions() { AckTimeout = TimeSpan.FromSeconds(1) }); await ackQueue.EnqueueAsync(new CounterEvent { Counter = 1 }); await ackQueue.EnqueueAsync(new CounterEvent { Counter = 2 }); // 堆代码 duidaima.com var event1 = await ackQueue.DequeueAsync<CounterEvent>(); ArgumentNullException.ThrowIfNull(event1); Console.WriteLine(@$"event1: {event1.ToJson()}"); var event2 = await ackQueue.DequeueAsync<CounterEvent>(); ArgumentNullException.ThrowIfNull(event2); Console.WriteLine(@$"event2: {event2.ToJson()}"); await ackQueue.AckMessageAsync(event2.Properties.EventId); var event3 = await ackQueue.DequeueAsync<CounterEvent>(); Console.WriteLine(@$"event3: {event3.ToJson()}"); await Task.Delay(2000); ackQueue.RequeueUnAckedMessages(); var event4 = await ackQueue.DequeueAsync<CounterEvent>(); Console.WriteLine(@$"event4: {event4.ToJson()}"); ArgumentNullException.ThrowIfNull(event4); await ackQueue.AckMessageAsync(event4.Properties.EventId);这里我们创建了一个 AckQueue 并且指定了 AckTimeout 为 1s 来方便测试,并向队列里添加了两条消息,然后再从队列中取出消息进行消费,输出结果如下:
public sealed class AckQueueOptions { public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(10); public bool AutoRequeue { get; set; } public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1); } public sealed class AckQueue : IDisposable { private readonly AckQueueOptions _options; private readonly ConcurrentQueue<IEvent> _queue = new(); private readonly ConcurrentDictionary<string, IEvent> _unAckedMessages = new(); private readonly Timer? _timer; public AckQueue() : this(new()) { } public AckQueue(AckQueueOptions options) { _options = options; if (options.AutoRequeue) { _timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod); } } public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null) { properties ??= new EventProperties(); if (string.IsNullOrEmpty(properties.EventId)) { properties.EventId = Guid.NewGuid().ToString(); } if (properties.EventAt == default) { properties.EventAt = DateTimeOffset.Now; } var internalEvent = new EventWrapper<TEvent> { Data = @event, Properties = properties }; _queue.Enqueue(internalEvent); return Task.CompletedTask; } public Task<IEvent<TEvent>?> DequeueAsync<TEvent>() { if (_queue.TryDequeue(outvar eventWrapper)) { _unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper); return Task.FromResult((IEvent<TEvent>?)eventWrapper); } return Task.FromResult<IEvent<TEvent>?>(null); } public Task AckMessageAsync(string eventId) { _unAckedMessages.TryRemove(eventId, out _); return Task.CompletedTask; } public void RequeueUnAckedMessages() { foreach (var message in _unAckedMessages) { if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout) { if (_unAckedMessages.TryRemove(message.Key, outvar eventWrapper) && eventWrapper != null) { _queue.Enqueue(eventWrapper); } } } } public void Dispose() { _timer?.Dispose(); } }为了实现自动化地将没有及时 Ack 的消息重新加入队列重新消费,我们添加了一个 Timer 默认 disable,enable 的时候定期重新将未及时 ack 的消息重新加入队列
这个实现是基于内存来实现的,如果要实现比较可靠的消息队列,基于 Redis 来实现的话要怎么实现呢?