Redis Server 相关源码阅读笔记,源码文件 server.h & server.c & networking.c


1. 启动流程

1.1. main

从 Redis 的 main 函数中,可以发现在 Redis 启动时主要做了以下几个工作:

  1. 加载配置

    • 调用 initServerConfig 设置默认配置值
    • 处理命令行参数值
    • 调用 loadServerConfig 加载配置文件中的配置项覆盖默认配置
  2. 初始化服务器

    • 调用 initServer 初始化服务器
    • 调用 moduleLoadFromQueue 加载 modules 列表
    • 调用 InitServerLast 初始化线程,为了防止线程本地存储初始化与 dlopen 调用冲突的问题,故须在加载 modules 之后完成
    • 调用 loadDataFromDisk 加载先前可能存在的数据集

      优先从 AOF 文件中恢复

  3. 运行:调用 aeMain 启动 event loop

1.2. initServer

在调用 initServer 初始化服务器的过程中,Redis 主要进行以下几项初始化工作:

  1. 设置信号处理函数 —— setupSignalHandlers

  2. 初始化 server 结构体中相应 fields

    • 主要为 common configs & clients configs
  3. 调用 createSharedObjects 初始化共享 Redis 数据对象列表 shared

  4. 调用 adjustOpenFilesLimit 根据 open file limit 设置真实的最大客户端连接数 server.maxclients

    • 会尝试按需更改 file limit 以满足最大客户端连接数设置
    • 会预留 CONFIG_MIN_RESERVED_FDS(32) 个 fd 用于持久化、监听 scokets 和日志文件等
  5. 调用 aeCreateEventLoop 初始化服务器的 event loop server.el

    • 文件事件容量设置为 server.maxclients + CONFIG_MIN_RESERVED_FDS + $96$
  6. 监听配置的地址,接收来自用户的请求

    • TCP socket: listenToPort
      • 可通过 bind 设置绑定的 IP 地址,默认监听机器上的所有 IP 地址
      • 可监听的 IP 地址上限为 CONFIG_BINDADDR_MAX(16)
    • Unix domain socket: anetUnixServer
  7. 初始化 server 内嵌的数据结构

    • 初始化 server.db 数组,长度为 server.dbnum,并初始化每个 redisDb 的状态
    • 调用 evictionPoolAlloc 初始化 eviction pool
    • 初始化 Pub/Sub 客户端
    • 初始化 RDB/AOF 相关状态,调用 aofRewriteBufferReset 重置 AOF rewrite buffer
    • 调用 resetServerStats 重置服务器数据统计状态
  8. 添加事件至服务器的 event loop

    • 添加时间事件 serverCron 处理定期任务
    • 添加可读文件事件处理客户端连接及请求
      • TCP sockets: acceptTcpHandler
      • TCP sockets with TLS: acceptTLSHandler
      • Unix socket: acceptUnixHandler
    • 添加可读文件事件监听 module 中阻塞的客户端 fd
    • 设置 beforeSleep & afterSleep 函数
  9. Others

    • Open the AOF file if needed.
    • 如为 cluster 模式,则调用 clusterInit 进行相应的初始化操作
    • Initialize the script cache.
    • Initialize the scripting environment.
    • Initialize the slow log.
    • Latency monitor initialization.

2. 定期任务

2.1. serverCron

Redis 在初始化时,会添加一个 time event serverCron 至 event loop,其运行频率为 server.hz;如果配置项 dynamic_hz 设置为 yes,则该频率会随着当前客户端连接数的增加而增加,上限为 CONFIG_MAX_HZ (500)。

该函数内部分任务的运行频率并不是 server.hz,而是由 run_with_period(milliseconds) {...} 中的 milliseconds 确定。

但是从 Redis 的事件循环处理过程可知 Redis 在处理时间事件之前会先遍历处理已触发的文件事件,因此 serverCron 实际上的运行频率可能比理论上低。

在该定期执行的任务中,主要做了以下工作:

  1. 更新服务器状态,可通过 INFO 命令查看这些状态

  2. 如初始化时设置的信号处理函数 sigShutdownHandler 收到 SIGTERM,则 Redis 会在调用 prepareForShutdown 处理以下工作后退出

    • 清理 lua debugger forked、RDB saving child、module child、AOF saving child
    • 在开启 AOF 持久化时,将 AOF 缓冲区内容 flush 至 AOF 文件
    • save 开启时,保存 RDB 文件
    • 触发 module 注册的关闭时需处理事件
    • Flush slave output buffers
    • Close the listening sockets
  3. 执行客户端相关定期任务:clientsCron

    • 单次调用遍历的客户端数量
      • 当客户端数量低于 CLIENTS_CRON_MIN_ITERATIONS(5) 时,会在单次调用中遍历全部客户端
      • 当客户端数量大于 $5$ 时,会确保尽量在 $1s$ 内遍历全部客户端
    • 检查/更新 客户端状态流程
      • clientsCronHandleTimeout: 检测客户端 idle 时间是否超过设置的 timeout

        No timeout for slaves, monitors, masters, blocked and Pub/Sub clients

      • clientsCronResizeQueryBuffer: 尝试对客户端缓冲区进行缩容
      • clientsCronTrackExpansiveClients: 记录客户端输入/输出缓冲区最大值,一段时间后便会 reset 为 $0$,用于在 INFO 中展示
      • clientsCronTrackClientsMemUsage: 计算客户端内存占用,用于在 INFO 中展示
        • 服务器客户端的内存统计按客户端的类型进行分类求和,是在多次调用 clientsCron 完成,因此不是实时数据
        • 由于客户端类型可能会变化,因此需要记录上次客户端的类型及内存使用量
  4. 执行数据库相关定期任务:databasesCron

    • 清理已过期 key: activeExpireCycle (ACTIVE_EXPIRE_CYCLE_SLOW)
    • 碎片整理: activeDefragCycle
    • 当前无子进程时:
      • 缩容:redisDb->dict 载荷因子小于 $0.1$ 触发
      • 渐进式 Rehash:若配置项 activerehashingyes,且当前数据库正在进行 Rehash,则尝试进行时长为 $1ms$ 的 Rehash 操作

      上述两步均会对 redisDb->dictredisDb->expires 执行

  5. 执行持久化相关操作

    • 当前无子进程且有 AOF rewrite 需求时,调用 rewriteAppendOnlyFileBackground 在后台执行 AOF rewrite
    • 当前有子进程时,检测子进程是否终止
    • 当前子进程无任务时:
      • 根据 save 相关配置项决定当前是否要调用 rdbSaveBackground 保存数据库状态
      • 根据配置项 auto-aof-rewrite-percentageauto-aof-rewrite-min-size 决定当前是否要调用 rewriteAppendOnlyFileBackground 进行 AOF rewrite 以减少 AOF 文件磁盘空间占用
    • 依据当前是否有子进程,调整 Rehash 扩容载荷因子阈值
    • 如此前有推迟的 AOF 落盘任务或 AOF 写入发生错误,则调用 flushAppendOnlyFile 将 AOF 缓冲区内容落盘
  6. Clear the paused clients flag if needed.

  7. 执行主从同步相关定期任务:replicationCron

    • TODO
  8. 若处于 cluster mode,则执行集群相关定期任务:clusterCron

    • TODO
  9. 若处于 sentinel mode,则执行 sentinel 相关定期任务 sentinelTimer

    • TODO
  10. Others

    • Cleanup expired MIGRATE cached sockets: migrateCloseTimedoutSockets
    • Stop the I/O threads if we don’t have enough pending work: stopThreadedIOIfNeeded
    • Resize tracking keys table if needed: trackingLimitUsedSlots
    • 当前子进程无任务且有 BGSAVE 需求时,调用 rdbSaveBackground 在后台执行 BGSAVE
    • Fire the cron loop modules event: moduleFireServerEvent

2.2. beforeSleep / afterSleep

TODO

3. 请求处理 (TODO)

Redis 服务器处理客户端请求时大致经历了以下过程(以监听 TCP 端口且无 TLS 为例):

  1. 在服务器初始化时,调用 listenToPort 监听指定端口,将相应的 scoket FD 存储至 server.ipfd 数组中

    • 由于机器上可能会有多个网卡,因此可通过配置项 bind 绑定指定的网卡接口,若无设置则默认为绑定所有的网卡接口,其数量上限为 CONFIG_BINDADDR_MAX (16)
  2. server.ipfd 数组中的每个 socket FD 调用 aeCreateFileEvent,为其创建 File Event acceptTcpHandler,并添加至 event loop server.el

  3. acceptTcpHandler 中:

    • anetTcpAccept: 建立连接
    • connCreateAcceptedSocket: 包装连接 FD 得到结构体 connection
    • createClient: 若当前客户端数量不高于 server.maxclients,则使用 connection 创建客户端 client
      • connection 非空,则添加事件 readQueryFromClient 至 event loop server.el,用于读取客户端请求
      • 初始化客户端 fields
      • connection 非空,将新建客户端加入服务器的客户端列表 server.clients 中,便于管理
      • Client state initialization for MULTI/EXEC
    • connAccept: Initiate accept, the socket is ready for I/O
  4. 在 File event readQueryFromClient 中,将来自用户的请求内容写入客户端的输入缓冲区 client->querybuf

  5. 将输入缓冲区中的内容解析为 Redis Command,并填充 client->argv & client->argc: processInputBuffer

  6. 处理请求:processCommandAndResetClient

  7. 将回复内容写入客户端输出缓冲区并将当前客户端加入列表 redisServer->clients_pending_write 中,后续会遍历该列表中的客户端,将其输出缓冲区内容写至相应的 socket:addReply.*

  8. 在进入 event loop 之前 (beforeSleep),调用 handleClientsWithPendingWritesUsingThreads,在该函数中:

    • 首先调用 writeToClient 将客户端输出缓冲区的内容同步写入相应的 socket
    • 若客户端输出缓冲区仍有未写入的内容,则注册写事件 sendReplyToClient 至 event loop,以完成剩余内容的写入

    可以看出会有同步写入回复至 socket 操作,因此为满足 “write the AOF before replying to the client” 的条件,在 beforeSleep 中,handleClientsWithPendingWritesUsingThreads 的调用应在 flushAppendOnlyFile 之后

4. 客户端管理

4.1. 新建

acceptTcpHandler (acceptTLSHandler/acceptUnixHandler) 中新建连接,当客户端连接数不超过上限 server.maxclients 时,为该连接创建 client 结构体并加入 server.clients 中。

4.2. 释放

5. 缓冲区管理

5.1. 输入缓冲区

客户端使用 sds string client->querybuf 作为输入缓冲区

  • 扩容:在 readQueryFromClient 中,为了存放用户的请求内容,会调用 sdsMakeRoomFor 对输入缓冲区扩容
  • 限制:在扩容并读取内容后,若此时输入缓冲区中字符串长度大于 server.client_max_querybuf_len,会调用 freeClientAsync 异步释放该客户端。其中 server.client_max_querybuf_len 可由配置项 client-query-buffer-limit 进行设置
  • 缩容:在定期执行的 serverCron 中,会调用 clientsCronResizeQueryBuffer 对输入缓冲区进行适当地缩容操作

若客户端为 master,则还会使用 client->pending_querybuf 存储尚未执行完毕的请求,其同样在 readQueryFromClient 中扩容,在 clientsCronResizeQueryBuffer 中缩容。

5.2. 输出缓冲区

客户端的输出缓冲区分为两部分:

  • 大小固定为 PROTO_REPLY_CHUNK_BYTES(16KB) 大小的字符数组 client->buf
  • clientReplyBlock 组成的 reply 链表 client->reply,每个链表节点容量最小为 PROTO_REPLY_CHUNK_BYTES(16KB)

在将回复写入输出缓冲区时:

  • 会先调用 _addReplyToBuffer 尝试将回复内容写入 client->buf
  • 若数组剩余空间不足,则会选择调用 _addReplyProtoToList 将回复内容写入 client->reply,写入回复内容后会调用 checkClientOutputBufferLimits 检查 client->reply 使用内存大小是否超出设定的上限,若超上限则会调用 freeClientAsync 异步释放该客户端。

输出缓冲区大小(实际上只计算 client->reply,而不包含 16KB 的定长字符数组)的限制可由配置项 client-output-buffer-limit 进行设置:

  • 可以为三种客户端分别进行设置

    • normal: normal clients including MONITOR clients
    • replica: replica clients
    • pubsub: clients subscribed to at least one pubsub channel or pattern
  • client->reply 内存使用量达到 hard limit,或保持在 soft limit 超过 soft seconds 时,会调用 freeClientAsync 异步释放该客户端

  • 由于 normal 客户端属于 “pull” 方式拉取数据,因此默认不设置上限,而 replicapubsub 客户端属于 “push” 方式推送数据至指定的 replicas 或 subscribers,未防止对方消费速度低于生产速度,因此默认会设置上限

6. 解析请求 - RESP

Redis 使用 RESP 协议进行 client-server 通信,可阅读 Redis Protocol specification 了解详情。

在 RESP 中,主要分为以下几种数据类型,每种数据类型编码的终止符均为 \r\n

Type Encoding
Simple Strings +[content]\r\n (non binary safe strings)
Errors -[err-type][err-msg]\r\n (non binary safe)
Integers :[integer]\r\n
Bulk Strings $[length]\r\n[content]\r\n (binary safe, $\le 512MB$)
Null Bulk String -> $-1\r\n
Arrays *[array-count]\r\n<other-data-type>...
Null Array -> *-1\r\n
Can contain mixed types, include Null Bulk String & Arrays

上述介绍的协议其实为 RESP2,新版本 RESP3 的介绍可阅读 RESP3 specification

客户端请求通常为两种格式:

  • PROTO_REQ_MULTIBULK: a RESP Array consisting of just Bulk Strings
  • PROTO_REQ_INLINE: inline command, space-separated arguments without encoding

processInputBuffer 中解析客户端输入缓冲区的内容为完整的 Redis command 后将其填充至 client->argv

7. 处理请求

在解析完一条完整的 Redis command 并将其填充至 client->argv 后,Redis 会调用 processCommandAndResetClient 来处理该请求:

  • 其首先调用 processCommand 处理请求,当请求处理完毕后,调用 commandProcessed 更新客户端相关状态。
  • 当在处理请求过程中客户端被 free 时,该函数返回 C_ERR,否则返回 C_OK

7.1. redisCommand

redisCommand 结构体的定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;
    uint64_t flags;
    redisGetKeysProc *getkeys_proc;
    int firstkey;
    int lastkey;
    int keystep;
    long long microseconds, calls;
    int id;
};
  • name: A string representing the command name.

  • proc: Pointer to the C function implementing the command, the function prototype is:

    1
    
    typedef void redisCommandProc(client *c);
    
  • arity: Number of arguments, it is possible to use -N to say >= N

  • sflags: Command flags as string. 详情可阅读 the meaning of the flags

  • flags: Flags as bitmask. Computed by Redis using the ‘sflags’ field: populateCommandTableParseFlags

  • getkeys_proc: An optional function to get key arguments from a command. This is only used when the following three fields are not enough to specify what arguments are keys. the function prototype is:

    1
    2
    
    typedef int *redisGetKeysProc(struct redisCommand *cmd,
                                  robj **argv, int argc, int *numkeys);
    
  • firstkey: First argument that is a key (0 = no keys)

  • lastkey: Last argument that is a key

  • keystep: Step to get all the keys from first to last argument. For instance in MSET the step is two since arguments are key,val,key,val,…

  • microseconds: Microseconds of total execution time for this command.

  • calls: Total number of calls of this command.

  • id: Command bit identifier for ACLs or other goals.

其中, flagsmicrosecondscalls 由 Redis 自身计算填充,初始时均设为 0 即可。

在 Redis server 初始化时,会调用 populateCommandTable 使用全局变量 redisCommandTable 填充 server.commandsserver.orig_commands

7.2. processCommand

1
int processCommand(client *c);

processCommand 函数的处理流程大致如下:

  1. 调用 moduleCallCommandFilters 将原 Redis Command 替换成在 module 中想要替换的 Redis Command

  2. handle QUIT command

  3. 根据 command name 从 server.commands 中查找相应的 redisCommand,并验证 client->argc 的有效性

  4. Check if the user is authenticated.

  5. Check if the user can run this command according to the current ACLs.

  6. If cluster is enabled perform the cluster redirection here. However we don’t perform the redirection if:

    • The sender of this command is our master.
    • The command has no key arguments.
  7. 调用 freeMemoryIfNeededAndSafe 检查当前内存使用情况,以满足 server.maxmemory 的限制

    • 如果当前有执行超时的 lua 脚本时,为了不混淆 lua 脚本运行时和驱逐键时传播出的 DEL 命令,因此不会检查内存使用情况
    • freeMemoryIfNeededAndSafe 中,如果当前有执行超时的 lua 脚本,或者正在 loading data 时,不会释放内存,直接返回 C_OK
  8. Make sure to use a reasonable amount of memory for client side caching metadata.

  9. Don’t accept write commands if there are problems(AOF/RDB errors) persisting on disk and if this is a master instance.

  10. 当主从复制延迟小于 min-replicas-max-lag 的从库数量小于 min-replicas-to-write 时,阻止写命令

  11. 当实例为只读从库时,检测写命令是否来自 master,若不为真,则拒绝该写命令

  12. Only allow a subset of commands in the context of Pub/Sub if the connection is in RESP2 mode. With RESP3 there are no limits.

  13. Only allow commands with flag “t”, such as INFO, SLAVEOF and so on, when replica-serve-stale-data is no and we are a slave with a broken link with master.

  14. Loading DB? Return an error if the command has not the CMD_LOADING flag.

  15. Lua script too slow? Only allow a limited number of commands.

  16. 执行该命令

    • 若在 MULTI/EXEC 上下文中,则将命令加入数组 client->mstate.commands 中,返回 +QUEUED\r\n
    • 否则,调用 call(c, CMD_CALL_FULL) 处理该请求

7.2.1. call

The prototype of call is:

1
void call(client *c, int flags);
flags meaning
CMD_CALL_NONE No flags
CMD_CALL_SLOWLOG Check command speed and log in the slowlog if needed
CMD_CALL_STATS Populate command stats
CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
or if the client flags are forcing propagation.
CMD_CALL_PROPAGATE_REPL Send command to slaves if it modified the dataset
or if the client flags are forcing propagation.
CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF
CMD_CALL_FULL Alias for SLOWLOG
CMD_CALL_NOWRAP Don’t wrap also propagate array into MULTI/EXEC:
the caller will handle it.
  • 调用 call 时,如果设置了 CMD_CALL_PROPAGATE_AOF
    • 若客户端设置了 CLIENT_FORCE_AOF,则即使该命令不改变 dataset 也会传播
    • 若客户端设置了 CLIENT_PREVENT_AOF_PROP,则即使该命令改变 dataset 也不会传播
  • 调用 call 时,如果未设置 CMD_CALL_PROPAGATE_AOF,则无论客户端设置何种标识,该命令也不会传播

CMD_CALL_PROPAGATE_REPL 同理

该函数的处理流程大致如下:

  1. server.fixed_time_expire++: 表示当前正在处理请求 call 上下文中,当判断 key 是否过期时,会使用缓存的 server.mstime(调用请求处理函数 redisCommand->proc 前更新) 作为当前时间,这样做是为了防止在处理请求多次访问同一 key 时,该 key 可能中途过期造成不一致的现象

  2. Send the command to clients in MONITOR mode if applicable. Administrative commands are considered too dangerous to be shown.

  3. 因为 call() 可能会递归调用,因此在调用 redisCommand->proc 前需要做以下准备工作,以在调用请求处理函数后相关状态可以恢复至调用前的状态:

    • 使用 client_old_flags 存储此时的 client->flags,随后清除命令传播相关标志位:CLIENT_FORCE_AOF, CLIENT_FORCE_REPL, & CLIENT_PREVENT_PROP
    • 使用 prev_also_propagate 存储此时的 server.also_propagate,随后将其初始化为空值
    • 记录此时的数据库状态 server.dirty 和时间 server.ustime
  4. 调用 redisCommand->proc 处理该请求,可能会修改 client->flagsserver.also_propagate,因此需要存储调用该函数之前的状态

  5. 按需记录慢日志,更新 Redis Command 相关状态

  6. 调用 propagate 将命令传播至 AOF 和 replications,决定是否传播由 dataset 是否有变化和相关 flags 决定:

    • 请求相关 flags CMD_CALL_PROPAGATE 是必要不充分条件
    • 客户端相关 flags CLIENT_FORCE_(AOF|REPL)CLIENT_PREVENT_(AOF|REPL)_PROP 是强制条件
  7. 使用 client_old_flagsclient->flags 恢复至调用 redisCommand->proc 前的状态

  8. 传输 server.also_propagate 中的请求至 AOF 和 replications。

    • 在处理请求时,可能会调用 alsoPropagate 将想要额外传播的请求加入 server.also_propagate 数组中
    • 只需请求相关 flags 决定是否传播,不受客户端 flags 影响,因此这一步在第 6 步之后没问题
    • 传递多条请求时,会使用 MULTI/EXEC 上下文包装,以保证原子性
  9. 使用 prev_also_propagateserver.also_propagate 恢复至调用 redisCommand->proc 前的状态

  10. If the client has keys tracking enabled for client side caching, make sure to remember the keys it fetched via this command.

  11. server.fixed_time_expire--: 离开 call 上下文

7.2.2. propagate

1
2
3
4
/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves. */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
flags meaning
PROPAGATE_NONE no propagation of command at all
PROPAGATE_AOF propagate into the AOF file if is enabled
PROPAGATE_REPL propagate into the replication link

7.3. commandProcessed

1
2
3
4
5
6
/* Perform necessary tasks after a command was executed:
 *
 * 1. The client is reset unless there are reasons to avoid doing it.
 * 2. In the case of master clients, the replication offset is updated.
 * 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c);