public interface IRateLimiter { bool TryAcquire(); } public sealed class SliddingWindowRateLimiter: IRateLimiter { private readonly TimeSpan _window; private readonly ChannelReader<DateTimeOffset> _reader; private readonly ChannelWriter<DateTimeOffset> _writer; public SliddingWindowRateLimiter(TimeSpan window, int permit) { _window = window; var options = new BoundedChannelOptions (permit) { FullMode = BoundedChannelFullMode.Wait, SingleReader = false, SingleWriter = true }; var channel = Channel .CreateBounded<DateTimeOffset>(options); _reader = channel.Reader; _writer = channel.Writer; Task.Factory.StartNew( Trim,TaskCreationOptions.LongRunning); } public bool TryAcquire() => _writer.TryWrite(DateTimeOffset.UtcNow); private void Trim() { if (!_reader.TryPeek(out var timestamp)) { Task.Delay(_window).Wait(); Trim(); } else { var delay = _window - (DateTimeOffset.UtcNow - timestamp); if (delay > TimeSpan.Zero) { Task.Delay(delay).Wait(); Trim(); } else { var valueTask = _reader.ReadAsync(); if (!valueTask.IsCompleted) _ = valueTask.Result; Trim(); } } } }
在实现的TryAcquire方法中,我们试着将当前时间戳写入这个Channel,并将写入的结果(成功或者失败)作为返回值。为了让Channel中只包含指定时间窗口的时间戳,我们利用一个LongRuning的Task执行Trim方法对过期的时间戳进行“裁剪”。Trim会调用ChannelReader的TRyPeek方法,如果返回False,意味着Channel为空,此时会等待一段窗口时间再进行“裁剪”。
如果提取出来时间戳在Now-Window与当前时间之间,意味着Channel里面的时间戳均在设定的窗口内,此时同样需要等待,等待时间为Window - (Now - Timestamp);只有在提取的时间超出窗口范围,我们才需要将其从Channel中移除。
var limiter = new SliddingWindowRateLimiter( TimeSpan.FromSeconds(2),2); // 堆代码 duidaima.com var index = 0; await Task.WhenAll( Enumerable.Range(1, 100) .Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine( $"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));我们在上面的演示程序中使用这个SliddingWindowRateLimiter,设定的限速规则为 2/2s。我们创建了100个Task并发地调用这个SliddingWindowRateLimiter,并将它返回True时的时间戳显示出来,具体输出如下所示。
public sealed class FixedWindowRateLimiter : IRateLimiter { private readonly long _windowTicks; private readonly int _permit; private long _nextWindowStartTimeTicks; private volatile int _count = 0; public FixedWindowRateLimiter(TimeSpan window, int permit) { _windowTicks = window.Ticks; _permit = permit; _nextWindowStartTimeTicks = DateTimeOffset.UtcNow.Add(window).Ticks; } public bool TryAcquire() { // 超出时间窗口,重置计数器,并调整下一个时间窗口的开始时间 var now = DateTimeOffset.UtcNow.Ticks; var nextWindowStartTimeTicks = nextWindowStartTimeTicks; if (now >= nextWindowStartTimeTicks && Interlocked.CompareExchange( ref _nextWindowStartTimeTicks , now + _windowTicks, nextWindowStartTimeTicks) == nextWindowStartTimeTicks) { Interlocked.Exchange(ref _count, 1); return true; } return _count < _permit && Interlocked.Increment(ref _count) <= _permit; } }在实现的TryAcquire方法中,我们先确定当前时间是否超过了设定的“下一个窗口开始时间”,如果是则调用Interlocked.CompareExchange方法修改__nextWindowStartTimeTicks字段。成功修改__nextWindowStartTimeTicks的线程会调整窗口开始时间,并重置计数器_count为1,并返回True。如果计数器大于等于设定阈值,方法返回False。否则我们让计数器+1,如果该值<=阈值,返回True,否则返回False。
IRateLimiter limiter = new FixedWindowRateLimiter( window: TimeSpan.FromSeconds(2), permit: 2); var index = 0; await Task.WhenAll( Enumerable.Range(1, 100) .Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine( $"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));
将FixedWindowRateLimiter应用到上面的演示程序,依然能得到我们希望的输出结果。