Stream
Redis 5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列(MQ),借鉴了 Kafka 的设计。
Redis Stream 的结构如上图所示:
- 每一个 Stream 都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。
- 每个 Stream 都有唯一的名称,它就是 Redis 的 key,首次使用
xadd
指令追加消息时自动创建。 - 每个 Stream 都可以挂多个消费组,每个消费组会有个游标
last_delivered_id
在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create
进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id
变量。 - 每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息,可以被不同消费组消费。
- 同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标
last_delivered_id
往前移动。每个消费者有一个组内唯一名称。也就是说一份消息只会被组内的一个消费者消费。 - 消费者 (Consumer) 内部会有个状态变量
pending_ids
,它记录了当前已经被客户端读取,但是还没有 ack 的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个pending_ids
变量在 Redis 官方被称之为 PEL(Pending Entries List),这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。 - 消息 ID 的形式是
timestampInMillis-sequence
,例如1527846880572-5
,它表示当前的消息在毫米时间戳1527846880572
时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数
,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。 - 消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。
常用命令
生产端
xadd
:追加消息。xdel
:删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。xrange
:获取消息列表,会自动过滤已经删除的消息。xlen
:消息长度。del
:删除 Stream。
示例:
127.0.0.1:6880> xadd streamtest * name mark age 18
"1626705954593-0"
streamtest
表示当前这个队列的名字,也就是 Redis 中的key
。*
号表示服务器自动生成 ID,name mark age 18
,是存入当前streamtest
这个队列的消息,采用的也是key/value
的存储形式。- 返回值
1626705954593-0
则是生成的消息 ID,由两部分组成:时间戳-序号
。时间戳时毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型。序号是在这个毫秒时间点内的消息序号。它也是个 64 位整型。
latest_generated_id
属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id
所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。如果不是非常特别的需求,强烈建议使用 Redis 的方案生成消息 ID,因为这种 时间戳+序号
的单调递增的 ID 方案,几乎可以满足全部的需求,但 ID 是支持自定义的。
127.0.0.1:6880> xadd streamtest * name james age 20
"1626706380924-0"
127.0.0.1:6880> xadd streamtest * name pooky age 33
"1626706393957-0"
127.0.0.1:6880> xlen streamtest
(integer) 3
127.0.0.1:6880> xrange streamtest - +
1) 1) "1626705954593-0"
2) 1) "name"
2) "mark"
3) "age"
4) "18"
2) 1) "1626706380924-0"
2) 1) "name"
2) "james"
3) "age"
4) "20"
3) 1) "1626706393957-0"
2) 1) "name"
2) "pooky"
3) "age"
4) "33"
xrange streamtest - +
中的 -
表示从消息 ID 最小的开始,+
表示到消息 ID 最大的结束。
也可以指定消息 ID 范围:
127.0.0.1:6880> xrange streamtest 1626706380924-0 1626706393957-0
1) 1) "1626706380924-0"
2) 1) "name"
2) "james"
3) "age"
4) "20"
2) 1) "1626706393957-0"
2) 1) "name"
2) "pooky"
3) "age"
4) "33"
127.0.0.1:6880> xrange streamtest - 1626706380924-0
1) 1) "1626705954593-0"
2) 1) "name"
2) "mark"
3) "age"
4) "18"
2) 1) "1626706380924-0"
2) 1) "name"
2) "james"
3) "age"
4) "20"
127.0.0.1:6880> xrange streamtest 1626706380924-0 +
1) 1) "1626706380924-0"
2) 1) "name"
2) "james"
3) "age"
4) "20"
2) 1) "1626706393957-0"
2) 1) "name"
2) "pooky"
3) "age"
4) "33"
删除指定的消息:
127.0.0.1:6880> xdel streamtest 1626706380924-0
(integer) 1
消费端
单消费者
虽然 Stream 中有消费者组的概念,但是可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread
,可以将 Stream 当成普通的消息队列 (list
) 来使用。使用 xread
时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list
)。
127.0.0.1:6880> xread count 1 streams stream2 0-0
1) 1) "stream2"
2) 1) 1) "1626706393957-0"
2) 1) "name"
2) "pooky"
3) "age"
4) "33"
count 1
表示获取一条消息streams
Redis 关键字stream2 0-0
表示从stream2
这个 Stream 中,从消息 ID 为0-0
的消息开始读取。0-0
表示从头开始读取。
从指定的消息 ID 开始读取(不包括命令中的消息 ID):
127.0.0.1:6880> xread count 2 streams stream2 1626705954593-0
1) 1) "stream2"
2) 1) 1) "1626706380924-0"
2) 1) "name"
2) "james"
3) "age"
4) "20"
2) 1) "stream2"
2) 1) 1) "1626706393957-0"
2) 1) "name"
2) "pooky"
3) "age"
4) "33"
从尾部读取最新的一条消息,$
代表从尾部读取,此时默认不返回任何消息:
127.0.0.1:6880> xread count 1 streams stream2 $
(nil)
所以最好以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来:
127.0.0.1:6880> xread block 0 count 1 streams stream2 $
block
后面的数字代表阻塞时间,单位毫秒。0
表示一直阻塞,直到有新的消息的到来。
新开一个客户端,往 stream2
中写入一条消息:
$ ./redis-cli -p
127.0.0.1:6880> xadd stream2 * name xiaoqiang age 18
"1626706489131-0"
再回到原来的客户端,就可以看到新的消息:
127.0.0.1:6880> xread block 0 count 1 streams stream2 $
1) 1) "stream2"
2) 1) 1) "1626706489131-0"
2) 1) "name"
2) "xiaoqiang"
3) "age"
4) "18"
(127.87s)
可以看到阻塞解除了,返回了新的消息内容,而且还显示了一个等待时间,这里等待了 127.87s
客户端如果想要使用 xread
进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID(Redis 是不会记住消费者消费的位置的)。下次继续调用 xread
时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。
消费组
创建消费组,需要传递起始消息 ID 参数用来初始化 last_delivered_id
变量:
127.0.0.1:6880> xgroup create stream2 cg1 0-0
stream2
指定要读取的队列cg1
消费组的名称0-0
表示从头开始消费
127.0.0.1:6880> xgroup create stream2 cg2 $
$
表示从尾部开始消费,**只接受新消息*8,当前 Stream 消息会全部忽略。
xinfo
命令查看队列的情况:
127.0.0.1:6880> xinfo stream stream2
1) "length"
2) (integer) 3 # 消息长度为 3
3) "radix-tree-keys"
4) (integer) 1 # 基数树键为 1
5) "radix-tree-nodes"
6) (integer) 2 # 基数树节点为 2
7) "last-generated-id"
8) "1626706489131-0" # 最后一个消息 ID 为 1626706489131-0
9) "groups"
10) (integer) 2 # 2 个消费组
11) "first-entry"
12) 1) 1) "1626705954593-0" # 第一个消息 ID 为 1626705954593-0
2) 1) "name"
2) "mark"
3) "age"
4) "18"
13) "last-entry"
14) 1) 1) "1626706489131-0" # 最后一个消息 ID 为 1626706489131-0
2) 1) "name"
2) "xiaoqiang"
3) "age"
4) "18"
查看队列的消费组信息:
S127.0.0.1:6880> xinfo groups stream2
1) 1) "name"
2) "cg1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
2) 1) "name"
2) "cg2"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1626706489131-0"
消费消息
有了消费组,自然还需要消费者,Stream 提供了 xreadgroup
指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。
它同 xread
一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack
指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。
127.0.0.1:6880> xreadgroup GROUP cg1 c1 count 1 streams stream2 >
1) 1) "stream2"
2) 1) 1) "1626706489131-0"
2) 1) "name"
2) "xiaoqiang"
3) "age"
4) "18"
GROUP
关键字cg1
消费组名称c1
消费者名称>
表示从当前消费组的last_delivered_id
后面开始读,每当消费者读取一条消息,last_delivered_id
变量就会前进。创建消费者组的时候设置了last_delivered_id
。count 1
表示获取一条消息。
设置阻塞等待:
127.0.0.1:6880> xreadgroup GROUP cg1 c1 block 0 count 1 streams stream2 >
和 xread
一样,block
后面的数字代表阻塞时间,单位毫秒。0
表示一直阻塞,直到有新的消息的到来。
如果同一个消费组有多个消费者,我们还可以通过 xinfo consumers
指令观察每个消费者的状态:
127.0.0.1:6880> xinfo consumers stream2 cg1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 5
5) "idle"
6) (integer) 15440
pending
表示当前消费者的 PEL 里面有多少条消息,也就是说有 5 条消息在等待 ACK 确认。idle
表示这个消费者已经空闲了多少秒了。上面的例子是空闲了 15440 秒。
确认消息:
127.0.0.1:6880> xack stream2 cg1 1626706489131-0
(integer) 1
再次查看确认消息变成了 4 条:
127.0.0.1:6880> xinfo consumers stream2 cg1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 4
5) "idle"
6) (integer) 91528
xack
允许带多个消息 ID:
127.0.0.1:6880> xack stream2 cg1 1626706489131-0 1626706489131-1
(integer) 2
更多的 Redis 的 Stream 命令参考 Redis 官方文档:
Redis 队列的几种实现
基于 List 的 LPUSH+BRPOP 的实现
优点:
足够简单,消费消息延迟几乎为零。
缺点:
- 不支持广播模式,不能重复消费,一旦消费就会被删除。
- 不支持分组消费。
- 如果线程一直阻塞在那里,Redis 客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候
blpop
和brpop
或抛出异常,所以在编写客户端消费者的时候要小心,如果 捕获到异常需要重试。 - 做消费者确认 ACK 麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个 Pending 列表,保证消息处理确认。
基于 ZSet 的实现
多用来实现延迟队列,当然也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息。
实现延迟队列时将消息序列化成一个字符串作为 zset 的 value
,这个消息的到期处理时间作为 score
。按照时间来排序,到期的消息会排在前面。消费消息时,只需要轮询 zset,获取到到期的消息进行处理即可。
订阅/发布模式
优点:
- 典型的广播模式,一个消息可以发布到多个消费者。
- 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息。
- 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。
缺点:
- 消息一旦发布,发布时若客户端不在线,则消息丢失。
- 不能保证每个消费者接收的时间是一致的。
- 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时。
可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。
基于 Stream 类型的实现
基本上已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控就需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件。
Stream 的问题
Stream 已经具备了一个消息队列的基本要素,生产者 API、消费者 API,消息 Broker,消息的确认机制等等,所以在使用消息中间件中产生的问题,这里一样也会遇到。
消息太多怎么办?
吐过消息积累太多,Stream 的链表很长,内存会不会爆掉? 而且 xdel
指令又不会删除消息,它只是给消息做了个标志位。
Redis 它提供了一个定长 Stream 功能。在 xadd
的指令提供一个定长长度 maxlen
,就可以将老的消息干掉,确保最多不超过指定长度。
消息如果忘记 ACK 会怎样?
Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存越来越大。所以消息要尽可能的快速消费并确认。
PEL 是已被消费者组获取但未被确认 (ACK) 的消息 ID 集合,这些消息处于"处理中"状态,具有以下特征:
- 已被某个消费者 (Consumer) 通过
xreadgroup
获取 - 尚未被
xack
确认处理完成 - 仍在消费者的"责任范围"内
- 会被包含在
xpenging
命令的返回结果中
PEL 如何避免消息丢失?
在消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup
的起始消息 ID 不能为参数 >
,而必须是任意有效的消息 ID,一般将参数设为 0-0
,表示读取所有的 PEL 消息以及自 last_delivered_id
之后的新消息。
死信问题
如果某个消息,不能被消费者处理,也就是不能被 XACK
,这是要长时间处于 Pending 列表中,即使被反复的转移给各个消费者也是如此。此时该消息的 delivery counter
(通过 XPENDING
可以查询到)就会累加,当累加到某个预设的临界值时,就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用 XDEL
语法,注意,这个命令并没有删除 Pending 中的消息,因此查看 Pending,消息还会在,可以在执行执行 XDEL
之后,XACK
这个消息标识其处理完毕。
专业的 MQ 中间件,例如 RabbitMQ,有死信队列的概念,当消息在一定时间内没有被消费,就会被投递到死信队列中,有专门的死信队列消费者来处理。但是 Stream 没有提供死信队列的概念,需要我们自己处理。
Stream 的高可用
Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过由于 Redis 的指令复制是异步的,在 failover 发生时,Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。
分区 Partition
Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。
什么时候选择 Redis Stream?
Stream 的消费模型借鉴了 Kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。
如果是中小项目和企业,在工作中已经使用了 Redis,在业务量不是很大,而又需要消息中间件功能的情况下,可以考虑使用 Redis 的 Stream 功能。但是如果并发量很高,资源足够支持下,还是以专业的消息中间件,比如 RocketMQ、Kafka 等来支持业务更好。