• 在C#中如何实现一个支持 Ack 的内存队列
  • 发布于 2个月前
  • 174 热度
    0 评论
  • 黄月英
  • 0 粉丝 57 篇博客
  •   
前言
有些时候我们会希望从消息队列里消费消息时只有消费成功 Ack 之后才将消息从队列中移除,我们尝试来实现一个支持 Ack 的队列。

实例
使用示例如下:
var ackQueue = new AckQueue(new AckQueueOptions()
{
    AckTimeout = TimeSpan.FromSeconds(1)
});
await ackQueue.EnqueueAsync(new CounterEvent { Counter = 1 });
await ackQueue.EnqueueAsync(new CounterEvent { Counter = 2 });
// 堆代码 duidaima.com
var event1 = await ackQueue.DequeueAsync<CounterEvent>();
ArgumentNullException.ThrowIfNull(event1);
Console.WriteLine(@$"event1: {event1.ToJson()}");

var event2 = await ackQueue.DequeueAsync<CounterEvent>();
ArgumentNullException.ThrowIfNull(event2);
Console.WriteLine(@$"event2: {event2.ToJson()}");
await ackQueue.AckMessageAsync(event2.Properties.EventId);

var event3 = await ackQueue.DequeueAsync<CounterEvent>();
Console.WriteLine(@$"event3: {event3.ToJson()}");

await Task.Delay(2000);
ackQueue.RequeueUnAckedMessages();

var event4 = await ackQueue.DequeueAsync<CounterEvent>();
Console.WriteLine(@$"event4: {event4.ToJson()}");
ArgumentNullException.ThrowIfNull(event4);
await ackQueue.AckMessageAsync(event4.Properties.EventId);
这里我们创建了一个 AckQueue 并且指定了 AckTimeout 为 1s 来方便测试,并向队列里添加了两条消息,然后再从队列中取出消息进行消费,输出结果如下:

前面两个分别取出了队列里的两条消息,第三次没有取到消息返回了 null,因为此时队列中没有消息,之后将没有 ack 的消息重新添加到队列里,所以此时队列里重新将第一条消息加入了队列,所以我们再次取消息的时候再次取到了第一条消息,从输出的 EventId/EventAt 可以看出来和第一个消息是一样的

实施
一般地我们可能会基于 ConcurrentQueue 或者 Channel 来实现一个内存队列,但是如果直接使用,取出来之后如果出现异常导致消息没有消费成功就会导致消息丢失,我们可以将消息存起来放在一个缓冲区,当消息消费成功 Ack 的时候再从缓冲区中移除,为了比较方便地处理我们也可以设置一个时间定期把缓冲区的消息重新加入回队列中。

实现代码如下:
public sealed class AckQueueOptions
{
    public TimeSpan AckTimeout { get; set; } = TimeSpan.FromMinutes(10);
    public bool AutoRequeue { get; set; }
    public TimeSpan RequeuePeriod { get; set; } = TimeSpan.FromMinutes(1);
}

public sealed class AckQueue : IDisposable
{
    private readonly AckQueueOptions _options;
    private readonly ConcurrentQueue<IEvent> _queue = new();
    private readonly ConcurrentDictionary<string, IEvent> _unAckedMessages = new();
    private readonly Timer? _timer;

    public AckQueue() : this(new()) { }

    public AckQueue(AckQueueOptions options)
    {
        _options = options;
        if (options.AutoRequeue)
        {
            _timer = new Timer(_ => RequeueUnAckedMessages(), null, options.RequeuePeriod, options.RequeuePeriod);
        }
    }
    public Task EnqueueAsync<TEvent>(TEvent @event, EventProperties? properties = null)
    {
        properties ??= new EventProperties();
        if (string.IsNullOrEmpty(properties.EventId))
        {
            properties.EventId = Guid.NewGuid().ToString();
        }

        if (properties.EventAt == default)
        {
            properties.EventAt = DateTimeOffset.Now;
        }

        var internalEvent = new EventWrapper<TEvent>
        {
            Data = @event,
            Properties = properties
        };

        _queue.Enqueue(internalEvent);
        return Task.CompletedTask;
    }

    public Task<IEvent<TEvent>?> DequeueAsync<TEvent>()
    {
        if (_queue.TryDequeue(outvar eventWrapper))
        {
            _unAckedMessages.TryAdd(eventWrapper.Properties.EventId, eventWrapper);
            return Task.FromResult((IEvent<TEvent>?)eventWrapper);
        }
        return Task.FromResult<IEvent<TEvent>?>(null);
    }
    public Task AckMessageAsync(string eventId)
    {
        _unAckedMessages.TryRemove(eventId, out _);
        return Task.CompletedTask;
    }

    public void RequeueUnAckedMessages()
    {
        foreach (var message in _unAckedMessages)
        {
            if (DateTimeOffset.Now - message.Value.Properties.EventAt > _options.AckTimeout)
            {
                if (_unAckedMessages.TryRemove(message.Key, outvar eventWrapper)
                    && eventWrapper != null)
                {
                    _queue.Enqueue(eventWrapper);
                }
            }
        }
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }
}
为了实现自动化地将没有及时 Ack 的消息重新加入队列重新消费,我们添加了一个 Timer 默认 disable,enable 的时候定期重新将未及时 ack 的消息重新加入队列

更多例子可以参考单元测试:https://github.com/WeihanLi/WeihanLi.Common/blob/1.0.74/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs

其它

这个实现是基于内存来实现的,如果要实现比较可靠的消息队列,基于 Redis 来实现的话要怎么实现呢?


参考
https://github.com/WeihanLi/WeihanLi.Common/blob/1.0.74/src/WeihanLi.Common/Event/AckQueue.cs
https://github.com/WeihanLi/WeihanLi.Common/blob/1.0.74/test/WeihanLi.Common.Test/EventsTest/AckQueueTest.cs
https://www.nuget.org/packages/WeihanLi.Common/1.0.74
https://github.com/WeihanLi/WeihanLi.Common/pull/242
https://github.com/WeihanLi/WeihanLi.Common/blob/dev/samples/DotNetCoreSample/EventTest.cs#L52
用户评论