SET lock_keyunique_value NX PX 10000lock_key 就是 key 键;
// 释放锁时,先比较 unique_value 是否相等,避免锁的误释放 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
public class RedisLock { private readonly IDatabase _database; private readonly string _lockKey; private string _lockValue; private readonly TimeSpan _lockTimeout; private readonly TimeSpan _renewInterval; private bool _isLocked; public RedisLock(IDatabase database, string lockKey, TimeSpan lockTimeout, TimeSpan renewInterval) { _database = database; _lockKey = lockKey; _lockTimeout = lockTimeout; _renewInterval = renewInterval; } //尝试获取锁,如果成功,则启动一个续租线程 public async Task<bool> AcquireAsync() { _lockValue = Guid.NewGuid().ToString(); var acquired = await _database.StringSetAsync(_lockKey, _lockValue, _lockTimeout, When.NotExists); if (acquired) { _isLocked = true; StartRenewal(); } return acquired; } //定期使用 KeyExpireAsync 命令重置键的过期时间,从而实现续租机制 private async void StartRenewal() { while (_isLocked) { await Task.Delay(_renewInterval); await _database.KeyExpireAsync(_lockKey, _lockTimeout); } } }
var redisDistributedLock = new RedisDistributedLock(name, connectionString); using (redisDistributedLock.Acquire()) { //持有锁 } //释放锁及相关资源
using (var handle = redisDistributedLock.TryAcquire()) { if (handle != null) { // 我们获得锁 } else { // 别人获得锁 } }支持异步和依赖注入,依赖注入:
// Startup.cs: services.AddSingleton<IDistributedLockProvider>(_ => new PostgresDistributedSynchronizationProvider(myConnectionString)); services.AddTransient<SomeService>(); // SomeService.cs public class SomeService { private readonly IDistributedLockProvider _synchronizationProvider; public SomeService(IDistributedLockProvider synchronizationProvider) { this._synchronizationProvider = synchronizationProvider; } public void InitializeUserAccount(int id) { // 通过provider构造lock var @lock = this._synchronizationProvider.CreateLock($"UserAccount{id}"); using (@lock.Acquire()) { // } using (this._synchronizationProvider.AcquireLock($"UserAccount{id}")) { // } } }四、浅析DistributedLock的Redis实现
.DistributedLock.Core 是项目的抽象类库,基础分布式锁、读写锁、信号量的Provider和接口。
.其它几个类库是用不同存储系统的具体实现public interface IDistributedLockProvider { IDistributedLock CreateLock(string name); }IDistributedLock:定义了控制并发访问的基本操作。该接口支持同步和异步方式获取锁,并提供超时和取消功能,以适应各种情况
public interface IDistributedLock { // 唯一Name string Name { get; } // 获取锁的方法 IDistributedSynchronizationHandle Acquire(TimeSpan? timeout = null, CancellationToken cancellationToken = default); //...... } DistributedLock.Redis类库,对Acquire的具体实现,该方法是尝试获取Redis分布式锁实例。 private async ValueTask<RedisDistributedLockHandle?> TryAcquireAsync(CancellationToken cancellationToken) { // 初始化Redis连接和相关参数 //CreateLockId = $"{Environment.MachineName}_{currentProcess.Id}_" + Guid.NewGuid().ToString("n") var primitive = new RedisMutexPrimitive(this.Key, RedLockHelper.CreateLockId(), this._options.RedLockTimeouts); // 获取和设置锁 var tryAcquireTasks = await new RedLockAcquire(primitive, this._databases, cancellationToken).TryAcquireAsync().ConfigureAwait(false); // 成功后,RedLockHandle这个里边实现了续租机制 return tryAcquireTasks != null ? new RedisDistributedLockHandle(new RedLockHandle(primitive, tryAcquireTasks, extensionCadence: this._options.ExtensionCadence, expiry: this._options.RedLockTimeouts.Expiry)) : null; }根据当前线程是否在同步上下文,对单库和多库实现进行区分和实现
// 该方法用于尝试获取分布式锁,并返回一个表示各个数据库节点获取锁状态的任务字典 public async ValueTask<Dictionary<IDatabase, Task<bool>>?> TryAcquireAsync() { // 检查当前线程是否在同步上下文中执行,以便根据不同情况采取不同的获取锁策略 if (SyncViaAsync.IsSynchronous&& this._databases.Count == 1) return this.TrySingleFullySynchronousAcquire(); // 创建一个任务字典,将每个数据库连接和其对应的获取锁任务关联起来 var tryAcquireTasks = this._databases.ToDictionary( db => db, db => Helpers.SafeCreateTask(state => state.primitive.TryAcquireAsync(state.db), (primitive, db)) ); // 等待所有获取锁任务完成,并返回一个表示整体状态的任务 var waitForAcquireTask = this.WaitForAcquireAsync(tryAcquireTasks).AwaitSyncOverAsync().ConfigureAwait(false); // 执行清理操作 // 返回结果 return succeeded ? tryAcquireTasks : null; }单库获取Redis分布式锁,就是通过set nx 设置值,返回bool,失败就释放资源,成功检查是否超时。不超时就返回任务字典
private Dictionary<IDatabase, Task<bool>>? TrySingleFullySynchronousAcquire() { var database = this._databases.Single(); bool success; var stopwatch = Stopwatch.StartNew(); // 通过StackExchange.Redis的StringSet进行无值设置key(set nx) try { success = this._primitive.TryAcquire(database); } catch { // 确保释放锁,以便防止出现死锁等问题。然后重新抛出异常 } if (success) { // 检查是否在超时时间内,并返回一个包含成功状态的任务字典;否则继续释放锁并返回null } return null; }多库中是否获取到分布式锁
private async Task<bool> WaitForAcquireAsync(IReadOnlyDictionary<IDatabase, Task<bool>> tryAcquireTasks) { // 超时或取消时自动停止等待 // 堆代码 duidaima.com using var timeout = new TimeoutTask(this._primitive.AcquireTimeout, this._cancellationToken); var incompleteTasks = new HashSet<Task>(tryAcquireTasks.Values) { timeout.Task }; // 计数器 var successCount = 0; var failCount = 0; var faultCount = 0; while (true) { // 不断等待任务完成,如果任务为timeout,则表示超时;否则需要根据任务的状态和信号来判断是否成功获取锁 var completed = await Task.WhenAny(incompleteTasks).ConfigureAwait(false); if (completed == timeout.Task) return false; // 超时 // 判断是否超过成功或者失败的阀值,是否超过1/2 if (completed.Status == TaskStatus.RanToCompletion) { var result = await ((Task<bool>)completed).ConfigureAwait(false); if (result) { ++successCount; // 是否超过1/2的库 if (RedLockHelper.HasSufficientSuccesses(successCount, this._databases.Count)) { return true; } } else { ++failCount; if (RedLockHelper.HasTooManyFailuresOrFaults(failCount, this._databases.Count)) { return false; } } } else { ++faultCount; // ...... } // ...... } }截止到目前,我们就知道如何获取和设置分布式锁了。接下来我们就看下是如何实现续租机制的。就是LeaseMonitor这个对象。
private static Task CreateMonitoringLoopTask(WeakReference<LeaseMonitor> weakMonitor, TimeoutValue monitoringCadence, CancellationToken disposalToken) { // 创建监视任务 return Task.Run(() => MonitoringLoop()); async Task MonitoringLoop() { var leaseLifetime = Stopwatch.StartNew(); do { await Task.Delay(monitoringCadence.InMilliseconds, disposalToken).TryAwait(); } // 检查RedLock租约的状态和可用性 while (!disposalToken.IsCancellationRequested && await RunMonitoringLoopIterationAsync(weakMonitor, leaseLifetime).ConfigureAwait(false)); } }RunMonitoringLoopIterationAsync 里边最终调用了续时的lua脚本