ふわふわ時間

0%

Redis Streams

主要介绍 Redis Streams 数据结构及部分操作,并不涉及源码部分。


整理自 Streams: a new general purpose data structure in RedisAn update on Redis Streams developmentRedis streams as a pure data structureIntroduction to Redis Streams

什么是 Redis Streams

本质上是一个抽象日志:

  • 日志中的每条记录是结构化、可扩展的 <field, value>
  • 支持范围查询和指定读取
  • 每条记录在日志中有唯一标识,标识中包含了单调递增的时间戳信息
  • 日志可以根据需要自动清理历史记录
  • 日志保存在内存中,但是也支持持久化

和其他可模拟消息队列的数据类型 (List, Pub/Sub, Zset) 对比:

List, Pub/Sub, Zset Redis Streams
List 不能从中间获取成员,\(O(N)\) 可以从中间获取成员,\(O(logN)\)
no fan-out is possible, blocking operations on list serve a single element to a single client 可以多个 clients 使用 XREAD blocking for new message
List 中没有标识符的概念 每条 msg 都有一个唯一的 id
Pub/Sub 无法保留历史消息,只能获取连接之后的消息,不支持范围查询 可以保存在 AOF 和 RDB 中
Pub/Sub 没有 consumer group 的概念 有 consumer group,更贴近真实的业务场景
Pub/Sub 的性能和订阅某个频道的 client 数量正相关 不存在
Zset 不允许添加重复成员,不支持成员淘汰和 block 新消息操作,内存开销大 允许,支持按时间线来淘汰历史数据,支持 block 操作,基于 redix tree 和 listpack,内存开销低
Zset 支持删除任意元素 不支持从中间删除元素 (log属性),more compact and memory efficient

Redis Streams 结构

{cmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+-----+--------+  +--------+  +--------+  +--------+
| key | Entry1 |--| Entry2 |--| ...... |--| EntryN |
+-----+--------+ +--------+ +--------+ +--------+
: :
+-----+ +-------------------------------------------+
| |
+----+--------+--------+--------+--------+--------+--------+
| ID | field1 | value1 | field2 | value3 | ...... | ...... |
+----+--------+--------+--------+--------+--------+--------+
: :
| +-------------------------------+
| |
+------------------+-----------------+
| millisecond time | sequence number |
+------------------+-----------------+
  • 在每个 streams 中含有多个 entry(message)
  • 每个 entry 有一唯一的 ID,并且可包含多个 <filed, value>
    • 添加连续的 entry,如果使用相同的 field name 可以节省内存
  • ID 由 millisecond time(64bit) 和 sequence number(64bit) 两部分组成
    • 其中,millisecond time 取当前时间戳和上一条插入 streams 的 entry 的时间戳之间的较大值,保证单调性

Redis Streams API

XADD

XADD key [MAXLEN [~] number] ID field string [field string ...]

Appends a new entry into the specified streams.

  • ID 为 * 表示让 Redis 自动生成 ID
  • 如果指定消息 ID,为了维持单调递增性,后面指令的 ID 必须大于之前指令的 ID
  • 可以指定 streams 的 MAXLEN,此时 streams 类似于一个固定大小的队列
  • 添加 ~ 表示并不严格要求 MAXLEN=number

XRANGE / XREVRANGE

XRANGE key start end [COUNT count]

  • 返回结果包含 start ID 和 end ID,即闭区间
  • -+ 分别表示最小和最大 ID
  • start ID 和 end ID 可以省略 sequenceNumber 部分,此时 start ID 默认 sequenceNumber = 0,而 end ID 默认 sequenceNumber 为最大值
  • 可以添加 COUNT 参数,实现类似 scan 操作

XREVRANGE key end start [COUNT count]

  • 逆序返回结果,使用方法类似 XRANGE

XREAD: blocking for new data

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • ID为 $ 表示指定 ID 为流中目前已存的最大 ID
  • BLOCK 超时时间单位为 ms,超时时间为 0 时表示不会超时
  • 当有多个 client BLOCK 等待新 msg 时,排队方式为 FIFO

XLEN

XLEN key

查看指定流中的 entry 个数,one ID one entry.

Consumer groups

一个 consumer group 就像一个 pseudo consumer 一样从 streams 中获取数据,它有如下特点:

  • 同一消息不可能传递给多个 consumer
  • 在 consumer group 中由 consumer(client) 自身提供 name 来进行区分
  • 每个 consumer group 保存了目前尚未分发的第一条消息的 ID (即,偏移量),据此,当 consumer 请求消费时, consumer group 保证不会将先前已分发的消息发送给该 consumer
  • 消费消息需要显式地 ACK 机制,只有当该消息确认被处理,才会从 consumer group 的消息队列中删除
  • Consumer group 记录当前所有的未决消息的信息,因此保证了 consumer 在中断重连后只能重新获取之前自身未 ACK 的消息

一个 consumer group 状态的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
+------------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+------------------------------------------+

XGROUP

创建、销毁和管理 consumer group

  • XGROUP CREATE key groupname id-or-$
    • 目前尚不支持从一个未存在的 streams 上创建 consumer group
  • XGROUP SETID key groupname id-or-$
  • XGROUP DESTROY key groupname
  • XGROUP DELCONSUMER key groupname consumername

XREADGROUP

通过 consumer group 的方式从 streams 中读取数据

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] * 首次出现的 consumer 将自动加入 consumer group,无需显式声明 * ID 为 > 表示请求尚未分发给其他 consumer 的新消息 * ID 为其他有效的数字 ID 表示请求自身尚未 ACK 的 pending messages

XACK

将一个未决消息标记为已处理

XACK key group ID [ID ...]

XPENDING

返回 consumer group 中的 pending messages 相关信息

XPENDING key group [start end count] [consumer]

XCLAIM

将满足条件的 pending messages 重分配

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

全部 consumer group 命令可查看相关页面