127.0.0.1:6379> XPENDING test_streams_topic test_group 1) (integer) 6 # 6个已读取但未处理的消息 2) "1716087652827-0" # 起始ID 3) "1716087870175-0" # 结束ID 4) 1) 1) "consumer" # 消费者consumer 2) "6"获取详细详细:使用 start end count 选项可以获取详细信息('-'表示从最小开始, '+'表示从最大开始,10 表示条数)
127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 1) 1) "1716087652827-0" # 消息ID 2) "consumer" # 消息ID 3) (integer) 455861 # 从读取到现在经历了455861ms 4) (integer) 1 # 消息被读取了1次 2) 1) "1716087659292-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 3) 1) "1716087665509-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 4) 1) "1716087670665-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 5) 1) "1716087677216-0" 2) "consumer" 3) (integer) 455861 4) (integer) 1 6) 1) "1716087870175-0" 2) "consumer" 3) (integer) 313465 4) (integer) 1通过上面的返回可以看出,每个Pending的消息有4个属性:
4.消息被读取次数
127.0.0.1:6379> XACK test_streams_topic test_group 1716087652827-0 # 通知消息处理结束,用消息ID标识 (integer) 1 # 确认成功 127.0.0.1:6379> XPENDING test_streams_topic test_group # 再次查看Pending列表 1) (integer) 5 # 已读取但未处理的消息已经变为4个 2) "1716087659292-0" 3) "1716087870175-0" 4) 1) 1) "consumer" # 消息条数变成5了 2) "5"
# 转移超过2100886ms的消息id1716087659292-0到消费者B的Pending列表 127.0.0.1:6379> XCLAIM test_streams_topic test_group consumerB 2100886 1716087659292-0 1) 1) "1716087659292-0" 2) 1) "key2" 2) "val2" 127.0.0.1:6379> XPENDING test_streams_topic test_group 1) (integer) 5 2) "1716087659292-0" 3) "1716087870175-0" 4) 1) 1) "consumer" # 消息只剩4条 2) "4" 2) 1) "consumerB" # 增加了消费者B 2) "1" # 消息1716087659292-0已经转移到消费者B的Pending中。 127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 consumerB 1) 1) "1716087659292-0" 2) "consumerB" 3) (integer) 89181 4) (integer) 2需要注意的是,转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,因为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功。例如,我把同一条消息转移给两个不同的消费者B和C,只有一个会成功:
127.0.0.1:6379> XCLAIM test_streams_topic test_group consumerB 2673946 1716087665509-0 1) 1) "1716087665509-0" 2) 1) "key3" 2) "val3" 127.0.0.1:6379> XCLAIM test_streams_topic test_group consumerC 2673946 1716087665509-0 (empty array) 127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 consumerB 1) 1) "1716087659292-0" 2) "consumerB" 3) (integer) 628050 4) (integer) 2 2) 1) "1716087665509-0" 2) "consumerB" 3) (integer) 118434 4) (integer) 2 127.0.0.1:6379> XPENDING test_streams_topic test_group - + 10 consumerC (empty array)需要说明的是,在消息转移的时候,有一个属性是消息被读取次数,delivery counter,它的作用是统计消息被读取的次数,包括被消息转移。这个属性主要用在判定是否为错误数据上。