• Redis Streams如何做到消息不丢失的
  • 发布于 2个月前
  • 498 热度
    0 评论
一 背景 
我们知道Redis Streams有一个很重要的能力,就是它能保证消息不丢失。这篇文章就来详细讲解下它是如何做到消息不丢失的。

二  过程说明
Redis Streams处理消息的流程如下:

具体过程:1 生产者生产消息, 消息进入到Redis Streams里面;2 消费群组读取消息,然后处理消息;3 处理完消息后,将消息标记为"已处理"。

这个过程中,消息丢失的环节只可能出现在2和3。为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,Redis Streams 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。

首先,使用XPENDING来获取已读但并没有处理完毕的消息,操作如下:
127.0.0.1:6379> XPENDING test_streams_topic test_group
1) (integer) 6   # 6个已读取但未处理的消息
2) "1716087652827-0" # 起始ID
3) "1716087870175-0" # 结束ID
4) 1) 1) "consumer"  # 消费者consumer
      2) "6"
获取详细详细:使用 start end count 选项可以获取详细信息('-'表示从最小开始, '+'表示从最大开始,10 表示条数)
127.0.0.1:6379> XPENDING  test_streams_topic test_group - + 10
1) 1) "1716087652827-0"  # 消息ID
   2) "consumer"  # 消息ID
   3) (integer) 455861  # 从读取到现在经历了455861ms
   4) (integer) 1  # 消息被读取了1次
2) 1) "1716087659292-0"
   2) "consumer"
   3) (integer) 455861
   4) (integer) 1
3) 1) "1716087665509-0"
   2) "consumer"
   3) (integer) 455861
   4) (integer) 1
4) 1) "1716087670665-0"
   2) "consumer"
   3) (integer) 455861
   4) (integer) 1
5) 1) "1716087677216-0"
   2) "consumer"
   3) (integer) 455861
   4) (integer) 1
6) 1) "1716087870175-0"
   2) "consumer"
   3) (integer) 313465
   4) (integer) 1
通过上面的返回可以看出,每个Pending的消息有4个属性:
1.消息ID
2.所属消费者
3.已读取时长

4.消息被读取次数


从上述的操作命令,可以看出,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那当消费者把消息处理完毕了,如何告知消息处理完成了呢?使用命令 XACK 告知消息处理完成。操作如下:
127.0.0.1:6379> XACK  test_streams_topic test_group 1716087652827-0  # 通知消息处理结束,用消息ID标识
(integer) 1 # 确认成功
127.0.0.1:6379> XPENDING  test_streams_topic test_group # 再次查看Pending列表
1) (integer) 5  # 已读取但未处理的消息已经变为4个
2) "1716087659292-0"
3) "1716087870175-0"
4) 1) 1) "consumer" # 消息条数变成5了
      2) "5"

如上图所示的这部分

有这样一个Pending机制的好处是,当某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,这样就可以继续处理该消息,从而保证消息的有序和不丢失。

三  当前消费者宕机了如何保证消息能被继续处理
上面说的Pending机制是在当前消费者重启后仍能继续工作的情况下才有效;那如果当前消费者宕机了(下线了),如何保证Pending能被其他消费者处理呢?在这种情况下,需要进行消息转移操作,也就是说,将该消费者Pending的消息,转给其他的消费者处理。

使用语法XCLAIM来实现, 将某个消息转移到自己的Pending列表中。要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。操作如下:
# 转移超过2100886ms的消息id1716087659292-0到消费者B的Pending列表

127.0.0.1:6379>  XCLAIM  test_streams_topic test_group consumerB 2100886 1716087659292-0
1) 1) "1716087659292-0"
   2) 1) "key2"
      2) "val2"
127.0.0.1:6379> XPENDING  test_streams_topic test_group
1) (integer) 5
2) "1716087659292-0"
3) "1716087870175-0"
4) 1) 1) "consumer" # 消息只剩4条
      2) "4" 
   2) 1) "consumerB" # 增加了消费者B
      2) "1"
# 消息1716087659292-0已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING  test_streams_topic test_group - + 10 consumerB
1) 1) "1716087659292-0"
   2) "consumerB"
   3) (integer) 89181
   4) (integer) 2
需要注意的是,转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,因为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功。例如,我把同一条消息转移给两个不同的消费者B和C,只有一个会成功:
127.0.0.1:6379>  XCLAIM  test_streams_topic test_group consumerB 2673946 1716087665509-0
1) 1) "1716087665509-0"
   2) 1) "key3"
      2) "val3"
127.0.0.1:6379>  XCLAIM  test_streams_topic test_group consumerC 2673946 1716087665509-0
(empty array)
127.0.0.1:6379> XPENDING  test_streams_topic test_group - + 10 consumerB
1) 1) "1716087659292-0"
   2) "consumerB"
   3) (integer) 628050
   4) (integer) 2
2) 1) "1716087665509-0"
   2) "consumerB"
   3) (integer) 118434
   4) (integer) 2
127.0.0.1:6379> XPENDING  test_streams_topic test_group - + 10 consumerC
(empty array)
需要说明的是,在消息转移的时候,有一个属性是消息被读取次数,delivery counter,它的作用是统计消息被读取的次数,包括被消息转移。这个属性主要用在判定是否为错误数据上。

如果出现这样的一种情况:某个消息,不能被消费者处理(就是不能XACK),它就会长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息)。触发了临界值条件,我们此时需要做的就是使用XDEL 将坏消息删除。

四 总结说明
如果消息只是被读取,而没有被XACK,那么消息此时就会处于一种Penging状态。该状态下,消息不会发生丢失。哪怕服务器重启了,消息也还是会保留。如果消费者次数宕机或下线了,对应的消息还能进行转移,确保消息能被正常的消费。如果消息无法被消费,消息的被读取次数在消息转移或者是读取的情况下,会发生增长。如果超过一定的阈值,我们就可以认为该消息是坏消息(死信),无法处理,可以删除了。
用户评论