闽公网安备 35020302035485号
var ackQueue = new AckQueue(new AckQueueOptions()
{
AckTimeout = TimeSpan.FromSeconds(1)
});
await ackQueue.EnqueueAsync(new CounterEvent { Counter = 1 });
await ackQueue.EnqueueAsync(new CounterEvent { Counter = 2 });
// 堆代码 duidaima.com
var event1 = await ackQueue.DequeueAsync<CounterEvent>();
ArgumentNullException.ThrowIfNull(event1);
Console.WriteLine(@$"event1: {event1.ToJson()}");
var event2 = await ackQueue.DequeueAsync<CounterEvent>();
ArgumentNullException.ThrowIfNull(event2);
Console.WriteLine(@$"event2: {event2.ToJson()}");
await ackQueue.AckMessageAsync(event2.Properties.EventId);
var event3 = await ackQueue.DequeueAsync<CounterEvent>();
Console.WriteLine(@$"event3: {event3.ToJson()}");
await Task.Delay(2000);
ackQueue.RequeueUnAckedMessages();
var event4 = await ackQueue.DequeueAsync<CounterEvent>();
Console.WriteLine(@$"event4: {event4.ToJson()}");
ArgumentNullException.ThrowIfNull(event4);
await ackQueue.AckMessageAsync(event4.Properties.EventId);
这里我们创建了一个 AckQueue 并且指定了 AckTimeout 为 1s 来方便测试,并向队列里添加了两条消息,然后再从队列中取出消息进行消费,输出结果如下:
public sealed class AckQueueOptions
{
public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(10);
public bool AutoRequeue { get; set; }
public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1);
}
public sealed class AckQueue : IDisposable
{
private readonly AckQueueOptions _options;
private readonly ConcurrentQueue<IEvent> _queue = new();
private readonly ConcurrentDictionary<string, IEvent> _unAckedMessages = new();
private readonly Timer? _timer;
public AckQueue() : this(new()) { }
public AckQueue(AckQueueOptions options)
{
_options = options;
if (options.AutoRequeue)
{
_timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod);
}
}
public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null)
{
properties ??= new EventProperties();
if (string.IsNullOrEmpty(properties.EventId))
{
properties.EventId = Guid.NewGuid().ToString();
}
if (properties.EventAt == default)
{
properties.EventAt = DateTimeOffset.Now;
}
var internalEvent = new EventWrapper<TEvent>
{
Data = @event,
Properties = properties
};
_queue.Enqueue(internalEvent);
return Task.CompletedTask;
}
public Task<IEvent<TEvent>?> DequeueAsync<TEvent>()
{
if (_queue.TryDequeue(outvar eventWrapper))
{
_unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
return Task.FromResult((IEvent<TEvent>?)eventWrapper);
}
return Task.FromResult<IEvent<TEvent>?>(null);
}
public Task AckMessageAsync(string eventId)
{
_unAckedMessages.TryRemove(eventId, out _);
return Task.CompletedTask;
}
public void RequeueUnAckedMessages()
{
foreach (var message in _unAckedMessages)
{
if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout)
{
if (_unAckedMessages.TryRemove(message.Key, outvar eventWrapper)
&& eventWrapper != null)
{
_queue.Enqueue(eventWrapper);
}
}
}
}
public void Dispose()
{
_timer?.Dispose();
}
}
为了实现自动化地将没有及时 Ack 的消息重新加入队列重新消费,我们添加了一个 Timer 默认 disable,enable 的时候定期重新将未及时 ack 的消息重新加入队列
这个实现是基于内存来实现的,如果要实现比较可靠的消息队列,基于 Redis 来实现的话要怎么实现呢?