闽公网安备 35020302035485号
SET lock_keyunique_value NX PX 10000lock_key 就是 key 键;
// 释放锁时,先比较 unique_value 是否相等,避免锁的误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
public class RedisLock
{
private readonly IDatabase _database;
private readonly string _lockKey;
private string _lockValue;
private readonly TimeSpan _lockTimeout;
private readonly TimeSpan _renewInterval;
private bool _isLocked;
public RedisLock(IDatabase database, string lockKey, TimeSpan lockTimeout, TimeSpan renewInterval)
{
_database = database;
_lockKey = lockKey;
_lockTimeout = lockTimeout;
_renewInterval = renewInterval;
}
//尝试获取锁,如果成功,则启动一个续租线程
public async Task<bool> AcquireAsync()
{
_lockValue = Guid.NewGuid().ToString();
var acquired = await _database.StringSetAsync(_lockKey, _lockValue, _lockTimeout, When.NotExists);
if (acquired)
{
_isLocked = true;
StartRenewal();
}
return acquired;
}
//定期使用 KeyExpireAsync 命令重置键的过期时间,从而实现续租机制
private async void StartRenewal()
{
while (_isLocked)
{
await Task.Delay(_renewInterval);
await _database.KeyExpireAsync(_lockKey, _lockTimeout);
}
}
}
var redisDistributedLock = new RedisDistributedLock(name, connectionString);
using (redisDistributedLock.Acquire())
{
//持有锁
} //释放锁及相关资源
using (var handle = redisDistributedLock.TryAcquire())
{
if (handle != null)
{
// 我们获得锁
}
else
{
// 别人获得锁
}
}
支持异步和依赖注入,依赖注入:// Startup.cs:
services.AddSingleton<IDistributedLockProvider>(_ => new PostgresDistributedSynchronizationProvider(myConnectionString));
services.AddTransient<SomeService>();
// SomeService.cs
public class SomeService
{
private readonly IDistributedLockProvider _synchronizationProvider;
public SomeService(IDistributedLockProvider synchronizationProvider)
{
this._synchronizationProvider = synchronizationProvider;
}
public void InitializeUserAccount(int id)
{
// 通过provider构造lock
var @lock = this._synchronizationProvider.CreateLock($"UserAccount{id}");
using (@lock.Acquire())
{
//
}
using (this._synchronizationProvider.AcquireLock($"UserAccount{id}"))
{
//
}
}
}
四、浅析DistributedLock的Redis实现
.DistributedLock.Core 是项目的抽象类库,基础分布式锁、读写锁、信号量的Provider和接口。
.其它几个类库是用不同存储系统的具体实现public interface IDistributedLockProvider
{
IDistributedLock CreateLock(string name);
}
IDistributedLock:定义了控制并发访问的基本操作。该接口支持同步和异步方式获取锁,并提供超时和取消功能,以适应各种情况public interface IDistributedLock
{
// 唯一Name
string Name { get; }
// 获取锁的方法
IDistributedSynchronizationHandle Acquire(TimeSpan? timeout = null, CancellationToken cancellationToken = default);
//......
}
DistributedLock.Redis类库,对Acquire的具体实现,该方法是尝试获取Redis分布式锁实例。
private async ValueTask<RedisDistributedLockHandle?> TryAcquireAsync(CancellationToken cancellationToken)
{
// 初始化Redis连接和相关参数
//CreateLockId = $"{Environment.MachineName}_{currentProcess.Id}_" + Guid.NewGuid().ToString("n")
var primitive = new RedisMutexPrimitive(this.Key, RedLockHelper.CreateLockId(), this._options.RedLockTimeouts);
// 获取和设置锁
var tryAcquireTasks = await new RedLockAcquire(primitive, this._databases, cancellationToken).TryAcquireAsync().ConfigureAwait(false);
// 成功后,RedLockHandle这个里边实现了续租机制
return tryAcquireTasks != null
? new RedisDistributedLockHandle(new RedLockHandle(primitive, tryAcquireTasks, extensionCadence: this._options.ExtensionCadence, expiry: this._options.RedLockTimeouts.Expiry))
: null;
}
根据当前线程是否在同步上下文,对单库和多库实现进行区分和实现// 该方法用于尝试获取分布式锁,并返回一个表示各个数据库节点获取锁状态的任务字典
public async ValueTask<Dictionary<IDatabase, Task<bool>>?> TryAcquireAsync()
{
// 检查当前线程是否在同步上下文中执行,以便根据不同情况采取不同的获取锁策略
if (SyncViaAsync.IsSynchronous&& this._databases.Count == 1)
return this.TrySingleFullySynchronousAcquire();
// 创建一个任务字典,将每个数据库连接和其对应的获取锁任务关联起来
var tryAcquireTasks = this._databases.ToDictionary(
db => db,
db => Helpers.SafeCreateTask(state => state.primitive.TryAcquireAsync(state.db), (primitive, db))
);
// 等待所有获取锁任务完成,并返回一个表示整体状态的任务
var waitForAcquireTask = this.WaitForAcquireAsync(tryAcquireTasks).AwaitSyncOverAsync().ConfigureAwait(false);
// 执行清理操作
// 返回结果
return succeeded ? tryAcquireTasks : null;
}
单库获取Redis分布式锁,就是通过set nx 设置值,返回bool,失败就释放资源,成功检查是否超时。不超时就返回任务字典private Dictionary<IDatabase, Task<bool>>? TrySingleFullySynchronousAcquire()
{
var database = this._databases.Single();
bool success;
var stopwatch = Stopwatch.StartNew();
// 通过StackExchange.Redis的StringSet进行无值设置key(set nx)
try { success = this._primitive.TryAcquire(database); }
catch
{
// 确保释放锁,以便防止出现死锁等问题。然后重新抛出异常
}
if (success)
{
// 检查是否在超时时间内,并返回一个包含成功状态的任务字典;否则继续释放锁并返回null
}
return null;
}
多库中是否获取到分布式锁private async Task<bool> WaitForAcquireAsync(IReadOnlyDictionary<IDatabase, Task<bool>> tryAcquireTasks)
{
// 超时或取消时自动停止等待
// 堆代码 duidaima.com
using var timeout = new TimeoutTask(this._primitive.AcquireTimeout, this._cancellationToken);
var incompleteTasks = new HashSet<Task>(tryAcquireTasks.Values) { timeout.Task };
// 计数器
var successCount = 0;
var failCount = 0;
var faultCount = 0;
while (true)
{
// 不断等待任务完成,如果任务为timeout,则表示超时;否则需要根据任务的状态和信号来判断是否成功获取锁
var completed = await Task.WhenAny(incompleteTasks).ConfigureAwait(false);
if (completed == timeout.Task)
return false; // 超时
// 判断是否超过成功或者失败的阀值,是否超过1/2
if (completed.Status == TaskStatus.RanToCompletion)
{
var result = await ((Task<bool>)completed).ConfigureAwait(false);
if (result)
{
++successCount;
// 是否超过1/2的库
if (RedLockHelper.HasSufficientSuccesses(successCount, this._databases.Count)) { return true; }
}
else
{
++failCount;
if (RedLockHelper.HasTooManyFailuresOrFaults(failCount, this._databases.Count)) { return false; }
}
}
else
{
++faultCount;
// ......
}
// ......
}
}
截止到目前,我们就知道如何获取和设置分布式锁了。接下来我们就看下是如何实现续租机制的。就是LeaseMonitor这个对象。private static Task CreateMonitoringLoopTask(WeakReference<LeaseMonitor> weakMonitor, TimeoutValue monitoringCadence, CancellationToken disposalToken)
{
// 创建监视任务
return Task.Run(() => MonitoringLoop());
async Task MonitoringLoop()
{
var leaseLifetime = Stopwatch.StartNew();
do
{
await Task.Delay(monitoringCadence.InMilliseconds, disposalToken).TryAwait();
}
// 检查RedLock租约的状态和可用性
while (!disposalToken.IsCancellationRequested && await RunMonitoringLoopIterationAsync(weakMonitor, leaseLifetime).ConfigureAwait(false));
}
}
RunMonitoringLoopIterationAsync 里边最终调用了续时的lua脚本