闽公网安备 35020302035485号

127.0.0.1:6379> XPENDING test_streams_topic test_group
1) (integer) 6 # 6个已读取但未处理的消息
2) "1716087652827-0" # 起始ID
3) "1716087870175-0" # 结束ID
4) 1) 1) "consumer" # 消费者consumer
2) "6"
获取详细详细:使用 start end count 选项可以获取详细信息('-'表示从最小开始, '+'表示从最大开始,10 表示条数)127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 1) 1) "1716087652827-0" # 消息ID 2) "consumer" # 消息ID 3) (integer) 455861 # 从读取到现在经历了455861ms 4) (integer) 1 # 消息被读取了1次 2) 1) "1716087659292-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 3) 1) "1716087665509-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 4) 1) "1716087670665-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 5) 1) "1716087677216-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 6) 1) "1716087870175-0" 2) "consumer" 3) (integer) 313465 4) (integer) 1通过上面的返回可以看出,每个Pending的消息有4个属性:
4.消息被读取次数
127.0.0.1:6379> XACK test_streams_topic test_group 1716087652827-0 # 通知消息处理结束,用消息ID标识
(integer) 1 # 确认成功
127.0.0.1:6379> XPENDING test_streams_topic test_group # 再次查看Pending列表
1) (integer) 5 # 已读取但未处理的消息已经变为4个
2) "1716087659292-0"
3) "1716087870175-0"
4) 1) 1) "consumer" # 消息条数变成5了
2) "5"


# 转移超过2100886ms的消息id1716087659292-0到消费者B的Pending列表
127.0.0.1:6379> XCLAIM test_streams_topic test_group consumerB 2100886 1716087659292-0
1) 1) "1716087659292-0"
2) 1) "key2"
2) "val2"
127.0.0.1:6379> XPENDING test_streams_topic test_group
1) (integer) 5
2) "1716087659292-0"
3) "1716087870175-0"
4) 1) 1) "consumer" # 消息只剩4条
2) "4"
2) 1) "consumerB" # 增加了消费者B
2) "1"
# 消息1716087659292-0已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 consumerB
1) 1) "1716087659292-0"
2) "consumerB"
3) (integer) 89181
4) (integer) 2
需要注意的是,转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,因为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功。例如,我把同一条消息转移给两个不同的消费者B和C,只有一个会成功:127.0.0.1:6379> XCLAIM test_streams_topic test_group consumerB 2673946 1716087665509-0
1) 1) "1716087665509-0"
2) 1) "key3"
2) "val3"
127.0.0.1:6379> XCLAIM test_streams_topic test_group consumerC 2673946 1716087665509-0
(empty array)
127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 consumerB
1) 1) "1716087659292-0"
2) "consumerB"
3) (integer) 628050
4) (integer) 2
2) 1) "1716087665509-0"
2) "consumerB"
3) (integer) 118434
4) (integer) 2
127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 consumerC
(empty array)
需要说明的是,在消息转移的时候,有一个属性是消息被读取次数,delivery counter,它的作用是统计消息被读取的次数,包括被消息转移。这个属性主要用在判定是否为错误数据上。