主要介绍 Redis Streams 数据结构及部分操作,并不涉及源码部分。
整理自 Streams: a new general purpose data structure in Redis、An update on Redis Streams development、Redis streams as a pure data structure 和 Introduction to Redis Streams 。
1. 什么是 Redis Streams
本质上是一个抽象日志:
- 日志中的每条记录是结构化、可扩展的
<field, value>
对 - 支持范围查询和指定读取
- 每条记录在日志中有唯一标识,标识中包含了单调递增的时间戳信息
- 日志可以根据需要自动清理历史记录
- 日志保存在内存中,但是也支持持久化
和其他可模拟消息队列的数据类型 (List, Pub/Sub, Zset) 对比:
List, Pub/Sub, Zset | Redis Streams |
---|---|
List 不能从中间获取成员,$O(N)$ | 可以从中间获取成员,$O(logN)$ |
no fan-out is possible, blocking operations on list serve a single element to a single client | 可以多个 clients 使用 XREAD blocking for new message |
List 中没有标识符的概念 | 每条 msg 都有一个唯一的 id |
Pub/Sub 无法保留历史消息,只能获取连接之后的消息,不支持范围查询 | 可以保存在 AOF 和 RDB 中 |
Pub/Sub 没有 consumer group 的概念 | 有 consumer group,更贴近真实的业务场景 |
Pub/Sub 的性能和订阅某个频道的 client 数量正相关 | 不存在 |
Zset 不允许添加重复成员,不支持成员淘汰和 block 新消息操作,内存开销大 | 允许,支持按时间线来淘汰历史数据,支持 block 操作,基于 redix tree 和 listpack,内存开销低 |
Zset 支持删除任意元素 | 不支持从中间删除元素 (log属性),more compact and memory efficient |
2. Redis Streams 结构
|
|
- 在每个 streams 中含有多个 entry(message)
- 每个 entry 有一唯一的 ID,并且可包含多个
<filed, value>
对- 添加连续的 entry,如果使用相同的 field name 可以节省内存
- ID 由 millisecond time(64bit) 和 sequence number(64bit) 两部分组成
- 其中,millisecond time 取当前时间戳和上一条插入 streams 的 entry 的时间戳之间的较大值,保证单调性
3. Redis Streams API
3.1. XADD
XADD key [MAXLEN [~] number] ID field string [field string ...]
Appends a new entry into the specified streams.
- ID 为
*
表示让 Redis 自动生成 ID - 如果指定消息 ID,为了维持单调递增性,后面指令的 ID 必须大于之前指令的 ID
- 可以指定 streams 的
MAXLEN
,此时 streams 类似于一个固定大小的队列 - 添加
~
表示并不严格要求MAXLEN=number
3.2. XRANGE / XREVRANGE
XRANGE key start end [COUNT count]
- 返回结果包含 start ID 和 end ID,即闭区间
-
和+
分别表示最小和最大 ID- start ID 和 end ID 可以省略 sequenceNumber 部分,此时 start ID 默认 sequenceNumber = 0,而 end ID 默认 sequenceNumber 为最大值
- 可以添加
COUNT
参数,实现类似scan
操作
XREVRANGE key end start [COUNT count]
- 逆序返回结果,使用方法类似
XRANGE
3.3. XREAD: blocking for new data
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- ID为
$
表示指定 ID 为流中目前已存的最大 ID - BLOCK 超时时间单位为 ms,超时时间为 0 时表示不会超时
- 当有多个 client BLOCK 等待新 msg 时,排队方式为 FIFO
3.4. XLEN
XLEN key
查看指定流中的 entry 个数,one ID one entry.
4. Consumer groups
一个 consumer group 就像一个 pseudo consumer 一样从 streams 中获取数据,它有如下特点:
- 同一消息不可能传递给多个 consumer
- 在 consumer group 中由 consumer(client) 自身提供 name 来进行区分
- 每个 consumer group 保存了目前尚未分发的第一条消息的 ID (即,偏移量),据此,当 consumer 请求消费时, consumer group 保证不会将先前已分发的消息发送给该 consumer
- 消费消息需要显式地 ACK 机制,只有当该消息确认被处理,才会从 consumer group 的消息队列中删除
- Consumer group 记录当前所有的未决消息的信息,因此保证了 consumer 在中断重连后只能重新获取之前自身未 ACK 的消息
一个 consumer group 状态的示例如下:
|
|
4.1. XGROUP
创建、销毁和管理 consumer group
XGROUP CREATE key groupname id-or-$
- 目前尚不支持从一个未存在的 streams 上创建 consumer group
XGROUP SETID key groupname id-or-$
XGROUP DESTROY key groupname
XGROUP DELCONSUMER key groupname consumername
4.2. XREADGROUP
通过 consumer group 的方式从 streams 中读取数据
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- 首次出现的 consumer 将自动加入 consumer group,无需显式声明
- ID 为
>
表示请求尚未分发给其他 consumer 的新消息 - ID 为其他有效的数字 ID 表示请求自身尚未 ACK 的 pending messages
4.3. XACK
将一个未决消息标记为已处理
XACK key group ID [ID ...]
4.4. XPENDING
返回 consumer group 中的 pending messages 相关信息
XPENDING key group [start end count] [consumer]
4.5. XCLAIM
将满足条件的 pending messages 重分配
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
全部 consumer group 命令可查看相关页面