• 如何手动撸一个Redis分布式锁?
  • 发布于 2个月前
  • 161 热度
    0 评论
对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。但是因为这些封装好的组建,让我们越来越懒。我们使用一些封装好的开源组建时,可以了解其中的原理,或者自己动手写一个,可以更好提升你的技术水平。今天我就教大家用原生的 Redis,手动撸一个 Redis 分布式锁,很有意思。

一.问题引入
其实通过 Redis 实现分布式锁,经常会有面试官会问,很多同学都知道用 SetNx() 去获取锁,解决并发问题。
SetNx() 是什么?我简单解答一下。
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

对于下面 2 种问题,你知道如何解决么?
1.如果获取锁的机器挂掉,如何处理?
2.当锁超时时,A、B 两个线程同时获取锁,可能导致锁被同时获取,如何解决?
这个就是我们实现 Redis 分布式锁时,需要重点解决的 2 个问题。

二.理论知识
刚才说过,通过 SetNx() 去获取锁,可以解决并发问题。当获取到锁,处理完业务逻辑后,会将锁释放。

但当机器宕机,或者重启时,没有执行 Del() 删除锁操作,会导致锁一直没有释放。所以,我们还需要记录锁的超时时间,判断锁是否超时。

这里我们通过 GetKey() 获取锁的超时时间 A,通过和当前时间比较,判断锁是否超时。如果锁未超时,直接返回,如果锁超时,重新设置锁的超时时间,成功获取锁。还有其它问题么?当然!因为在并发场景下,会存在 A、B 两个线程同时执行 SetNx(),导致两个线程同时获取到锁。那如何解决呢?将 SetNx() 用 GetSet() 替换。

GetSet() 是什么?我简单解答一下。

Redis Getset 命令用于设置指定 key 的值,并返回 key 的旧值。这里不太好理解,我举个例子。假如 A、B 两个线程,A 先执行,B 后执行:
1.对于线程 A 和 B,通过 GetKey 获取的超时时间都是 T1 = 100;
2.对于线程 A,将超时时间 Ta = 200 通过 GetSet() 设置,返回 T2 = 100,此时满足条件 “T1 == T2”,获取锁成功;

3.对于线程 B,将超时时间 Tb = 201 通过 GetSet() 设置,由于锁超时时间已经被 A 重新设置,所以返回 T2 = 200,此时不满足条件 “T1 == T2”,获取锁失败。


可能有同学会继续问,之前设置的超时是 Ta = 200,现在变成了 Tb = 201,延长或缩短了锁的超时时间,不会有问题么?其实在现实并发场景中,能走到这一步,基本是“同时”进来的,两者的时间差非常小,可以忽略此影响。

三.代码实战
这里给出 Go 代码,注释都写得非常详细,即使你不会 Go,读注释也能读懂。
// 获取分布式锁,需要考虑以下情况:
// 1. 机器A获取到锁,但是在未释放锁之前,机器挂掉或者重启,会导致其它机器全部hang住,这时需要根据锁的超时时间,判断该锁是否需要重置;
// 2. 当锁超时时,需要考虑两台机器同时去获取该锁,需要通过GETSET方法,让先执行该方法的机器获取锁,另外一台继续等待。
func GetDistributeLock(key string, expireTime int64) bool {

 currentTime := time.Now().Unix()
 expires := currentTime + expireTime
 redisAlias := "jointly"
 // 堆代码 duidaima.com
 // 1.获取锁,并将value值设置为锁的超时时间
 redisRet, err := redis.SetNx(redisAlias, key, expires)
 if nil == err && utils.MustInt64(1) == redisRet {
  // 成功获取到锁
  return true
 }

 // 2.当获取到锁的机器突然重启&挂掉时,就需要判断锁的超时时间,如果锁超时,新的机器可以重新获取锁
 // 2.1 获取锁的超时时间
 currentLockTime, err := redis.GetKey(redisAlias, key)
 if err != nil {
  return false
 }

 // 2.2 当"锁的超时时间"大于等于"当前时间",证明锁未超时,直接返回
 if utils.MustInt64(currentLockTime) >= currentTime {
  return false
 }

 // 2.3 将最新的超时时间,更新到锁的value值,并返回旧的锁的超时时间
 oldLockTime, err := redis.GetSet(redisAlias, key, expires)
 if err != nil {
  return false
 }

 // 2.4 当锁的两个"旧的超时时间"相等时,证明之前没有其它机器进行GetSet操作,成功获取锁
 // 说明:这里存在并发情况,如果有A和B同时竞争,A会先GetSet,当B再去GetSet时,oldLockTime就等于A设置的超时时间
 if utils.MustString(oldLockTime) == currentLockTime {
  return true
 }
 return false
}
删除锁逻辑:
// 删除分布式锁
// @return bool true-删除成功;false-删除失败
func DelDistributeLock(key string) bool {
 redisAlias := "jointly"
 redisRet := redis.Del(redisAlias, key)
 if redisRet != nil {
  return false
 }
 return true
}
业务逻辑:
func DoProcess(processId int) {

 fmt.Printf("启动第%d个线程\n", processId)

 redisKey := "redis_lock_key"
 for {
  // 获取分布式锁
  isGetLock := GetDistributeLock(redisKey, 10)
  if isGetLock {
   fmt.Printf("Get Redis Key Success, id:%d\n", processId)
   time.Sleep(time.Second * 3)
   // 删除分布式锁
   DelDistributeLock(redisKey)
  } else {
   // 如果未获取到该锁,为了避免redis负载过高,先睡一会
   time.Sleep(time.Second * 1)
  }
 }
}
最后起个 10 个多线程,去执行这个 DoProcess():
func main() {
 // 初始化资源
 var group string = "group"
 var name string = "name"
 var host string

 // 初始化资源
 host = "http://ip:port"
 _, err := xrpc.NewXRpcDefault(group, name, host)
 if err != nil {
  panic(fmt.Sprintf("initRpc when init rpc  failed, err:%v", err))
 }
 redis.SetRedis("louzai", "redis_louzai")

 // 开启10个线程,去抢Redis分布式锁
 for i := 0; i <= 9; i ++ {
  go DoProcess(i)
 }

 // 避免子线程退出,主线程睡一会
 time.Sleep(time.Second * 100)
 return
}
程序跑了100 s,我们可以看到,每次都只有 1 个线程获取到锁,分别是 2、1、5、9、3,执行结果如下:
启动第0个线程
启动第6个线程
启动第9个线程
启动第4个线程
启动第5个线程
启动第2个线程
启动第1个线程
启动第8个线程
启动第7个线程
启动第3个线程
Get Redis Key Success, id:2
Get Redis Key Success, id:2
Get Redis Key Success, id:1
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3

四.后记
这个代码,其实是我很久之前写的,因为当时 Go 没有开源的分布式锁,但是我又需要通过单机去执行某个任务,所以就自己手动撸了一个,后来在线上跑了 2 年,一直都没有问题。不过期间也遇到过一个坑,就是我们服务迁移时,忘了将旧机器的分布式锁停掉,导致锁经常被旧机器抢占,当时觉得很奇怪,我的锁呢?

写这篇文章时,又让我想到当时工作的场景。最后再切回正题,本文由浅入深,详细讲解了 Redis 实现的详细过程,以及锁超时、并发场景下,如何保证锁能正常释放,且只有一个线程去获取锁。大家还有其它实现 Redis 分布式的方式么?可以给我留言哈。
用户评论