Redis client 相关源码阅读笔记,源码文件 server.h & server.c & networking.c


Redis 服务器处理客户端请求时大致经历了以下过程(以监听 TCP 端口且无 TLS 为例):

  1. 在服务器初始化时,调用 listenToPort 监听指定端口,将相应的 scoket FD 存储至 server.ipfd 数组中

    • 由于机器上可能会有多个网卡,因此可通过配置项 bind 绑定指定的网卡接口,若无设置则默认为绑定所有的网卡接口,其数量上限为 CONFIG_BINDADDR_MAX (16)
  2. server.ipfd 数组中的每个 socket FD 调用 aeCreateFileEvent,为其创建 File Event acceptTcpHandler,并添加至 event loop server.el

  3. acceptTcpHandler 中:

    • anetTcpAccept: 建立连接
    • connCreateAcceptedSocket: 包装连接 FD 得到结构体 connection
    • createClient: 若当前客户端数量不高于 server.maxclients,则使用 connection 创建客户端 client
      • connection 非空,则添加事件 readQueryFromClient 至 event loop server.el,用于读取客户端请求
      • 初始化客户端 fields
      • connection 非空,将新建客户端加入服务器的客户端列表 server.clients 中,便于管理
      • Client state initialization for MULTI/EXEC
    • connAccept: Initiate accept, the socket is ready for I/O
  4. 在 File event readQueryFromClient 中,将来自用户的请求内容写入客户端的输入缓冲区 client->querybuf

  5. 将输入缓冲区中的内容解析为 Redis Command,并填充 client->argv & client->argc: processInputBuffer

  6. 处理请求:processCommandAndResetClient

  7. 将回复内容写入客户端输出缓冲区并将当前客户端加入列表 redisServer->clients_pending_write 中,后续会遍历该列表中的客户端,将其输出缓冲区内容写至相应的 socket:addReply.*

  8. 在进入 event loop 之前 (beforeSleep),调用 handleClientsWithPendingWritesUsingThreads,在该函数中:

    • 首先调用 writeToClient 将客户端输出缓冲区的内容同步写入相应的 socket
    • 若客户端输出缓冲区仍有未写入的内容,则注册写事件 sendReplyToClient 至 event loop,以完成剩余内容的写入

    可以看出会有同步写入回复至 socket 操作,因此为满足 “write the AOF before replying to the client” 的条件,在 beforeSleep 中,handleClientsWithPendingWritesUsingThreads 的调用应在 flushAppendOnlyFile 之后

客户端管理

新建

acceptTcpHandler (acceptTLSHandler/acceptUnixHandler) 中新建连接,当客户端连接数不超过上限 server.maxclients 时,为该连接创建 client 结构体并加入 server.clients 中。

释放

缓冲区管理

输入缓冲区

客户端使用 sds string client->querybuf 作为输入缓冲区

  • 扩容:在 readQueryFromClient 中,为了存放用户的请求内容,会调用 sdsMakeRoomFor 对输入缓冲区扩容
  • 限制:在扩容并读取内容后,若此时输入缓冲区中字符串长度大于 server.client_max_querybuf_len,会调用 freeClientAsync 异步释放该客户端。其中 server.client_max_querybuf_len 可由配置项 client-query-buffer-limit 进行设置
  • 缩容:在定期执行的 serverCron 中,会调用 clientsCronResizeQueryBuffer 对输入缓冲区进行适当地缩容操作

若客户端为 master,则还会使用 client->pending_querybuf 存储尚未执行完毕的请求,其同样在 readQueryFromClient 中扩容,在 clientsCronResizeQueryBuffer 中缩容。

输出缓冲区

客户端的输出缓冲区分为两部分:

  • 大小固定为 PROTO_REPLY_CHUNK_BYTES(16KB) 大小的字符数组 client->buf
  • clientReplyBlock 组成的 reply 链表 client->reply,每个链表节点容量最小为 PROTO_REPLY_CHUNK_BYTES(16KB)

在将回复写入输出缓冲区时:

  • 会先调用 _addReplyToBuffer 尝试将回复内容写入 client->buf
  • 若数组剩余空间不足,则会选择调用 _addReplyProtoToList 将回复内容写入 client->reply,写入回复内容后会调用 checkClientOutputBufferLimits 检查 client->reply 使用内存大小是否超出设定的上限,若超上限则会调用 freeClientAsync 异步释放该客户端。

输出缓冲区大小(实际上只计算 client->reply,而不包含 16KB 的定长字符数组)的限制可由配置项 client-output-buffer-limit 进行设置:

  • 可以为三种客户端分别进行设置

    • normal: normal clients including MONITOR clients
    • replica: replica clients
    • pubsub: clients subscribed to at least one pubsub channel or pattern
  • client->reply 内存使用量达到 hard limit,或保持在 soft limit 超过 soft seconds 时,会调用 freeClientAsync 异步释放该客户端

  • 由于 normal 客户端属于 “pull” 方式拉取数据,因此默认不设置上限,而 replicapubsub 客户端属于 “push” 方式推送数据至指定的 replicas 或 subscribers,未防止对方消费速度低于生产速度,因此默认会设置上限

解析请求 - RESP

Redis 使用 RESP 协议进行 client-server 通信,可阅读 Redis Protocol specification 了解详情。

在 RESP 中,主要分为以下几种数据类型,每种数据类型编码的终止符均为 \r\n

Type Encoding
Simple Strings +[content]\r\n (non binary safe strings)
Errors -[err-type][err-msg]\r\n (non binary safe)
Integers :[integer]\r\n
Bulk Strings $[length]\r\n[content]\r\n (binary safe, $\le 512MB$)
Null Bulk String -> $-1\r\n
Arrays *[array-count]\r\n<other-data-type>...
Null Array -> *-1\r\n
Can contain mixed types, include Null Bulk String & Arrays

上述介绍的协议其实为 RESP2,新版本 RESP3 的介绍可阅读 RESP3 specification

客户端请求通常为两种格式:

  • PROTO_REQ_MULTIBULK: a RESP Array consisting of just Bulk Strings
  • PROTO_REQ_INLINE: inline command, space-separated arguments without encoding

processInputBuffer 中解析客户端输入缓冲区的内容为完整的 Redis command 后将其填充至 client->argv

处理请求

在解析完一条完整的 Redis command 并将其填充至 client->argv 后,Redis 会调用 processCommandAndResetClient 来处理该请求:

  • 其首先调用 processCommand 处理请求,当请求处理完毕后,调用 commandProcessed 更新客户端相关状态。
  • 当在处理请求过程中客户端被 free 时,该函数返回 C_ERR,否则返回 C_OK

redisCommand

redisCommand 结构体的定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;
    uint64_t flags;
    redisGetKeysProc *getkeys_proc;
    int firstkey;
    int lastkey;
    int keystep;
    long long microseconds, calls;
    int id;
};
  • name: A string representing the command name.

  • proc: Pointer to the C function implementing the command, the function prototype is:

    1
    
    typedef void redisCommandProc(client *c);
    
  • arity: Number of arguments, it is possible to use -N to say >= N

  • sflags: Command flags as string. 详情可阅读 the meaning of the flags

  • flags: Flags as bitmask. Computed by Redis using the ‘sflags’ field: populateCommandTableParseFlags

  • getkeys_proc: An optional function to get key arguments from a command. This is only used when the following three fields are not enough to specify what arguments are keys. the function prototype is:

    1
    2
    
    typedef int *redisGetKeysProc(struct redisCommand *cmd,
                                  robj **argv, int argc, int *numkeys);
    
  • firstkey: First argument that is a key (0 = no keys)

  • lastkey: Last argument that is a key

  • keystep: Step to get all the keys from first to last argument. For instance in MSET the step is two since arguments are key,val,key,val,…

  • microseconds: Microseconds of total execution time for this command.

  • calls: Total number of calls of this command.

  • id: Command bit identifier for ACLs or other goals.

其中, flagsmicrosecondscalls 由 Redis 自身计算填充,初始时均设为 0 即可。

在 Redis server 初始化时,会调用 populateCommandTable 使用全局变量 redisCommandTable 填充 server.commandsserver.orig_commands

processCommand

1
int processCommand(client *c);

processCommand 函数的处理流程大致如下:

  1. 调用 moduleCallCommandFilters 将原 Redis Command 替换成在 module 中想要替换的 Redis Command

  2. handle QUIT command

  3. 根据 command name 从 server.commands 中查找相应的 redisCommand,并验证 client->argc 的有效性

  4. Check if the user is authenticated.

  5. Check if the user can run this command according to the current ACLs.

  6. If cluster is enabled perform the cluster redirection here. However we don’t perform the redirection if:

    • The sender of this command is our master.
    • The command has no key arguments.
  7. 调用 freeMemoryIfNeededAndSafe 检查当前内存使用情况,以满足 server.maxmemory 的限制

    • 如果当前有执行超时的 lua 脚本时,为了不混淆 lua 脚本运行时和驱逐键时传播出的 DEL 命令,因此不会检查内存使用情况
    • freeMemoryIfNeededAndSafe 中,如果当前有执行超时的 lua 脚本,或者正在 loading data 时,不会释放内存,直接返回 C_OK
  8. Make sure to use a reasonable amount of memory for client side caching metadata.

  9. Don’t accept write commands if there are problems(AOF/RDB errors) persisting on disk and if this is a master instance.

  10. 当主从复制延迟小于 min-replicas-max-lag 的从库数量小于 min-replicas-to-write 时,阻止写命令

  11. 当实例为只读从库时,检测写命令是否来自 master,若不为真,则拒绝该写命令

  12. Only allow a subset of commands in the context of Pub/Sub if the connection is in RESP2 mode. With RESP3 there are no limits.

  13. Only allow commands with flag “t”, such as INFO, SLAVEOF and so on, when replica-serve-stale-data is no and we are a slave with a broken link with master.

  14. Loading DB? Return an error if the command has not the CMD_LOADING flag.

  15. Lua script too slow? Only allow a limited number of commands.

  16. 执行该命令

    • 若在 MULTI/EXEC 上下文中,则将命令加入数组 client->mstate.commands 中,返回 +QUEUED\r\n
    • 否则,调用 call(c, CMD_CALL_FULL) 处理该请求

call

The prototype of call is:

1
void call(client *c, int flags);
flags meaning
CMD_CALL_NONE No flags
CMD_CALL_SLOWLOG Check command speed and log in the slowlog if needed
CMD_CALL_STATS Populate command stats
CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
or if the client flags are forcing propagation.
CMD_CALL_PROPAGATE_REPL Send command to slaves if it modified the dataset
or if the client flags are forcing propagation.
CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF
CMD_CALL_FULL Alias for SLOWLOG
CMD_CALL_NOWRAP Don’t wrap also propagate array into MULTI/EXEC:
the caller will handle it.
  • 调用 call 时,如果设置了 CMD_CALL_PROPAGATE_AOF
    • 若客户端设置了 CLIENT_FORCE_AOF,则即使该命令不改变 dataset 也会传播
    • 若客户端设置了 CLIENT_PREVENT_AOF_PROP,则即使该命令改变 dataset 也不会传播
  • 调用 call 时,如果未设置 CMD_CALL_PROPAGATE_AOF,则无论客户端设置何种标识,该命令也不会传播

CMD_CALL_PROPAGATE_REPL 同理

该函数的处理流程大致如下:

  1. server.fixed_time_expire++: 表示当前正在处理请求 call 上下文中,当判断 key 是否过期时,会使用缓存的 server.mstime(调用请求处理函数 redisCommand->proc 前更新) 作为当前时间,这样做是为了防止在处理请求多次访问同一 key 时,该 key 可能中途过期造成不一致的现象

  2. Send the command to clients in MONITOR mode if applicable. Administrative commands are considered too dangerous to be shown.

  3. 因为 call() 可能会递归调用,因此在调用 redisCommand->proc 前需要做以下准备工作,以在调用请求处理函数后相关状态可以恢复至调用前的状态:

    • 使用 client_old_flags 存储此时的 client->flags,随后清除命令传播相关标志位:CLIENT_FORCE_AOF, CLIENT_FORCE_REPL, & CLIENT_PREVENT_PROP
    • 使用 prev_also_propagate 存储此时的 server.also_propagate,随后将其初始化为空值
    • 记录此时的数据库状态 server.dirty 和时间 server.ustime
  4. 调用 redisCommand->proc 处理该请求,可能会修改 client->flagsserver.also_propagate,因此需要存储调用该函数之前的状态

  5. 按需记录慢日志,更新 Redis Command 相关状态

  6. 调用 propagate 将命令传播至 AOF 和 replications,决定是否传播由 dataset 是否有变化和相关 flags 决定:

    • 请求相关 flags CMD_CALL_PROPAGATE 是必要不充分条件
    • 客户端相关 flags CLIENT_FORCE_(AOF|REPL)CLIENT_PREVENT_(AOF|REPL)_PROP 是强制条件
  7. 使用 client_old_flagsclient->flags 恢复至调用 redisCommand->proc 前的状态

  8. 传输 server.also_propagate 中的请求至 AOF 和 replications。

    • 在处理请求时,可能会调用 alsoPropagate 将想要额外传播的请求加入 server.also_propagate 数组中
    • 只需请求相关 flags 决定是否传播,不受客户端 flags 影响,因此这一步在第 6 步之后没问题
    • 传递多条请求时,会使用 MULTI/EXEC 上下文包装,以保证原子性
  9. 使用 prev_also_propagateserver.also_propagate 恢复至调用 redisCommand->proc 前的状态

  10. If the client has keys tracking enabled for client side caching, make sure to remember the keys it fetched via this command.

  11. server.fixed_time_expire--: 离开 call 上下文

propagate

1
2
3
4
/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves. */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
flags meaning
PROPAGATE_NONE no propagation of command at all
PROPAGATE_AOF propagate into the AOF file if is enabled
PROPAGATE_REPL propagate into the replication link

commandProcessed

1
2
3
4
5
6
/* Perform necessary tasks after a command was executed:
 *
 * 1. The client is reset unless there are reasons to avoid doing it.
 * 2. In the case of master clients, the replication offset is updated.
 * 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c);