Redis event loop 相关源码阅读笔记,源码文件 ae.h & ae.c & ae_epoll.c & ae_evport.c & ae_kqueue.c & ae_select.c


小结

  • Redis 会按序处理各个已触发的事件,无抢占
  • 因为 Redis 在处理时间事件之前,会先处理文件事件,所以时间事件的处理时间通常比设定值晚一些
  • 文件事件的 AE_BARRIER 和时间事件的 refcount 这两种特性,在 Redis 目前的实现中尚未使用

Tips

  • 循环遍历链表时,如果需要删除链表节点,可以使用标记删除。

数据结构

文件事件

Redis 的文件事件的数据结构如下:

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
  • mask: 文件事件的类型,可为以下几种叠加

    • AE_NONE: 初始值,未注册读写事件
    • AE_READABLE: 读事件,监听 fd 是否可读
    • AE_WRITABLE: 写事件,监听 fd 是否可写
    • AE_BARRIER: 用于 fd 同时可读写时,控制读写处理的顺序
      • 同一个 fd 可同时读写时,一般我们会先处理读事件,这样利于我们在处理完一个查询操作后立马返回查询结果
      • AE_BARRIER 被设置时,我们会先处理写事件,再处理读事件

      按照 issues/7098 所述,且查看代码后,发现目前 Redis 好像没用到 AE_BARRIER 特性

  • rfileProc: 读事件处理函数

  • wfileProc: 写事件处理函数

  • clientData: 在 rfileProc/wfileProc 中可能会用到的数据

Fired file event 结构如下:

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
    int fd;   /* file descriptor */
    int mask; /* same with aeFileEvent*/
} aeFiredEvent;

时间事件

Redis 的时间事件的数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount;  /* refcount to prevent timer events from being
                    * freed in recursive time event calls. */
} aeTimeEvent;
  • id: 时间事件的 ID,单调递增,即新产生的时间事件 ID 大于已有时间事件的 ID
  • when_sec + when_ms:时间点,记录时间事件触发的时间戳
  • timeProc:时间事件处理函数,当前时间不小于 <when_sec, when_ms> pair 时需调用 timeProc
  • finalizerProc: 时间事件终结处理函数,删除时间事件时调用
  • clientData: 在 timeProc/finalizerProc 中可能会用到的数据
  • prev / next: 分别指向前/后一个时间事件,在 event loop 中时间事件为双向链表
  • refcount: 引用计数,防止递归调用 processTimeEvents 时,时间事件被释放

    根据 pull/7253,Redis 现有实现无递归调用 processTimeEvents 的 case

Event loop

Redis 的 event loop 的数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
  • maxfd: 注册至 event loop 中的文件事件的最大 fd,其值不超过 setsize

  • setsize: event loop 中可容纳的文件事件数量上限

  • event:event loop 中文件事件按照数组形式保存,其长度为 setsize

    • 未注册部分 aeFileEvent->maskAE_NONE
  • fired: Fired file event 数组,长度同样为 setsize

  • apidata: used for polling API specific data,记录 polling API 的状态

    • Redis 的 ae 包装了不同的 I/O 多路复用函数库,在编译时会按照选择系统中性能最佳的函数库,顺序为 ae_evport -> ae_epoll -> ae_kqueue.c -> ae_select.c

    • 当使用 Linux 的 epoll 时,apidata 存储的内容为:

      1
      2
      3
      4
      
      typedef struct aeApiState {
          int epfd;
          struct epoll_event *events;
      } aeApiState;
      
  • timeEventHead: event loop 中时间事件头节点

  • timeEventNextId:已注册的时间事件的 ID 最大值

    • 每次调用 aeCreateTimeEvent 注册时间事件时加 1
    • Use: Make sure we don’t process time events created by time events in this iteration

    不过在目前的实现中这个 check 是多余的,因为新增的时间事件添加在时间事件链表头部

  • lastTime:记录 event loop 上次调用 processTimeEvents 的时间戳

    • 当发生时钟跳变时,为了不推迟时间事件的触发执行,将所有时间事件的 when_sec 设置为 0,即立马处理该事件

    因为在 Redis 中其实只会注册一个时间事件 serverCron,所以问题不大

  • stop: 值不为 0 时,停止轮询处理事件

  • beforesleep: 调用 epoll_wait 前可能会调用的函数

  • aftersleep: 调用 epoll_wait 后可能会调用的函数

  • flags: 值可以设置为 AE_DONT_WAIT,表示调用 epoll_wait 时 timeout 为 0 ,即不等待

API

文件事件

1
2
3
4
5
6
7
8
9
/* Create a file evet and add to event loop */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);

/* Delete the file event specified by fd */
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);

/* Get the file event specified by fd */
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);

当然,在添加/删除文件事件时,不仅会修改文件事件数组 aeEventLoop->events,而且也会修改 aeEventLoop->apidata,即 polling api state.

时间事件

1
2
3
4
5
6
7
/* Create a time event and add to event loop */
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);

/* Delete a time event specified by id */
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
  • 在添加时间事件时,milliseconds 指的是时间间隔
  • 在删除时间事件时,并不会真的从时间事件链表中删除该节点,而是将其 id 标记为 AE_DELETED_EVENT_ID
    • 真正的删除操作在 processTimeEvents 中实现

Event loop

1
2
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop);
  1. 首先依据 aeEventLoop->lastTime 值判断是否发生了时钟跳变,若为真,则将所有时间事件设置为立马触发执行
  2. 从前往后遍历时间事件链表,执行已触发的时间事件
    • 首先判断时间事件是否已被标记为删除,若为真,则从链表中删除该时间事件并释放内存空间
    • 在执行完时间事件处理函数后,依据返回值决定是否将该时间事件标记为删除
      • 返回值为 AE_NOMORE(-1),表示该时间事件是个定时事件,执行一次后可删除,故标记为 AE_DELETED_EVENT_ID
      • 返回值为其他值时,表示该时间事件为定期事件,返回值为执行时间间隔,故利用返回值更新 when_sec & when_ms,即下次触发事件
  3. 返回处理的时间事件数量

Tips:循环遍历链表时,如果需要删除链表节点,可以使用标记删除。

1
2
/* Process every pending time event, then every pending file event */
int aeProcessEvents(aeEventLoop *eventLoop, int flags);

形参 flags 的取值可为以下几种叠加:

Flags meaning
$0$ 啥都不干,立马返回
AE_ALL_EVENTS 执行已触发的文件事件和时间事件
AE_FILE_EVENTS 执行已触发的文件事件
AE_TIME_EVENTS 执行已触发的时间事件
AE_DONT_WAIT 调用 epoll_wait 时,timeout 设置为 $0$,不等待
AE_CALL_BEFORE_SLEEP 调用 epoll_wait 前调用 beforesleep 函数
AE_CALL_AFTER_SLEEP 调用 epoll_wait 返回后调用 aftersleep 函数

函数执行流程大致如下:

  1. 根据 flags 的设置决定调用 epoll_wait 的 timeout 值
    • timeout 为 $0$:最近将触发的时间事件时间戳不大于当前时间戳或 AE_DONT_WAIT 被设置
    • timeout 大于 $0$:最近将触发的时间事件时间戳大于当前时间戳且未设置 AE_DONT_WAIT,则 timeout 为两者差值
    • timeout 为 $-1$:不处理时间事件或时间事件链表为空且未设置 AE_DONT_WAIT,则 timeout 为 $-1$,即一直等待
  2. 调用 aeEventLoop->beforesleep
  3. 调用 epoll_wait 检查已触发的文件事件
  4. 调用 aeEventLoop->aftersleep
  5. 处理已触发的文件事件
    • 同一个 fd 可同时读写时,一般先处理读事件,当 AE_BARRIER 被设置时,会先处理写事件,再处理读事件
  6. 调用 processTimeEvents 处理时间事件
  7. 返回处理的文件事件和时间事件数量之和
1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

事件循环主函数,会尝试同时处理文件事件和时间事件,且在调用 epoll_wait 前后会分别调用已设置的 aeEventLoop->beforesleepaeEventLoop->aftersleep

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* Create a event loop and initialize */
aeEventLoop *aeCreateEventLoop(int setsize);

/* Delete a event loop and free */
void aeDeleteEventLoop(aeEventLoop *eventLoop);

/* set aeEventLoop->stop = 1 */
void aeStop(aeEventLoop *eventLoop);

/* Call `poll` to wait for milliseconds until 
 * the given file descriptor becomes writable/readable/exception */
int aeWait(int fd, int mask, long long milliseconds);

/* Return the name of the multiplexing layer currently in use */
char *aeGetApiName(void);

/* Setup the before sleep process */
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);

/* Setup the after sleep process */
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);

/* Return the current set size */
int aeGetSetSize(aeEventLoop *eventLoop);

/* Resize the maximum set size of the event loop. */
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

/* Change aeEventLoop->flags
 * Tells the next iteration/s of the event processing to set timeout of 0 */
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);