ae

在函数中,调用aeApiPoll来监控事件,该函数封装了select、kqueue、epoll三种机制

numevents = aeApiPoll(eventLoop, tvp);

epoll是poll的升级版本,把描述符列表交给内核,一旦有事件发生,内核把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表。效率极大提高

setsize:指定事件循环要监听的文件描述符集合的大小。这个值与配置文件中得maxclients有关。

      events:存放所有注册的读写事件,是大小为setsize的数组。内核会保证新建连接的fd是当前可用描述符的最小值,所以最多监听setsize个描述符,那么最大的fd就是setsize - 1。这种组织方式的好处是,可以以fd为下标,索引到对应的事件,在事件触发后根据fd快速查找到对应的事件。

    fired:存放触发的读写事件。同样是setsize大小的数组。

      timeEventHead:redis将定时器事件组织成链表,这个属性指向表头。

      apidata:存放epoll、select等实现相关的数据。

      beforesleep:事件循环在每次迭代前会调用beforesleep执行一些异步处理

 
  • // <MM>

  • // setsize指定事件循环监听的fd的数目

  • // 由于内核保证新创建的fd是最小的正整数,所以直接创建setsize大小

  • // 的数组,存放对应的even

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
 
    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

epoll的实现如下,就是调用epoll_wait,函数返回后,会将触发的事件存放到state->events数组中的前numevents个元素。接下来,填充fired数组,设置每个触发事件的fd,以及事件类型。

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;                   //监听的最大文件号  
    int setsize;                 //可以注册的事件的上限,默认为1024*10
    long long timeEventNextId;    //定时器事件的ID编号管理(分配ID号所用)  
    time_t lastTime;             /* Used to detect system clock skew */
    aeFileEvent *events;         //注册的文件事件,这些是需要进程关注的文件  
    aeFiredEvent *fired;         //poll结果,待处理的文件事件的文件号和事件类型  
    aeTimeEvent *timeEventHead;    //定时器时间链表
    int stop;                     //时间轮询是否结束?  
    void *apidata;                 //文件事件的轮询数据和结果数据:poll; 三种轮询方式:epoll(linux),select(windows),kqueue  
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

Redis 中的事件循环

2016年12月28日 09:57:52 Joker_Ye 阅读数:1322 标签: Redis 循环 服务 事件 IO

在目前的很多服务中,由于需要持续接受客户端或者用户的输入,所以需要一个事件循环来等待并处理外部事件,这篇文章主要会介绍 Redis 中的事件循环是如何处理事件的。

在文章中,我们会先从 Redis 的实现中分析事件是如何被处理的,然后用更具象化的方式了解服务中的不同模块是如何交流的。

aeEventLoop

在分析具体代码之前,先了解一下在事件处理中处于核心部分的 aeEventLoop 到底是什么:

ae

reids-eventloop

aeEventLoop 在 Redis 就是负责保存待处理文件事件和时间事件的结构体,其中保存大量事件执行的上下文信息,同时持有三个事件数组:

  • aeFileEvent
  • aeTimeEvent
  • aeFiredEvent

aeFileEvent 和 aeTimeEvent 中会存储监听的文件事件和时间事件,而最后的 aeFiredEvent 用于存储待处理的文件事件,我们会在后面的章节中介绍它们是如何工作的。

Redis 服务中的 EventLoop

在 redis-server 启动时,首先会初始化一些 redis 服务的配置,最后会调用 aeMain 函数陷入 aeEventLoop 循环中,等待外部事件的发生:

 
  1. int main(int argc, char **argv) {

  2. ...

  3.  
  4. aeMain(server.el);

  5. }

aeMain 函数其实就是一个封装的 while 循环,循环中的代码会一直运行直到 eventLoop 的 stop 被设置为 true :

 
  1. void aeMain(aeEventLoop *eventLoop) {

  2. eventLoop->stop = 0;

  3. while (!eventLoop->stop) {

  4. if (eventLoop->beforesleep != NULL)

  5. eventLoop->beforesleep(eventLoop);

  6. aeProcessEvents(eventLoop, AE_ALL_EVENTS);

  7. }

  8. }

它会不停尝试调用 aeProcessEvents 对可能存在的多种事件进行处理,而 aeProcessEvents 就是实际用于处理事件的函数:

 
  1. int aeProcessEvents(aeEventLoop *eventLoop, int flags) {

  2. int processed = 0, numevents;

  3.  
  4. if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

  5.  
  6. if (eventLoop->maxfd != -1 ||

  7. ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {

  8. struct timeval *tvp;

  9.  
  10. #1:计算 I/O 多路复用的等待时间 tvp

  11.  
  12. numevents = aeApiPoll(eventLoop, tvp);

  13. for (int j = 0; j < numevents; j++) {

  14. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

  15. int mask = eventLoop->fired[j].mask;

  16. int fd = eventLoop->fired[j].fd;

  17. int rfired = 0;

  18.  
  19. if (fe->mask & mask & AE_READABLE) {

  20. rfired = 1;

  21. fe->rfileProc(eventLoop,fd,fe->clientData,mask);

  22. }

  23. if (fe->mask & mask & AE_WRITABLE) {

  24. if (!rfired || fe->wfileProc != fe->rfileProc)

  25. fe->wfileProc(eventLoop,fd,fe->clientData,mask);

  26. }

  27. processed++;

  28. }

  29. }

  30. if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);

  31. return processed;

  32. }

上面的代码省略了 I/O 多路复用函数的等待时间,不过不会影响我们对代码的理解,整个方法大体由两部分代码组成,一部分处理文件事件,另一部分处理时间事件。

Redis 中会处理两种事件:时间事件和文件事件。

文件事件

在一般情况下, aeProcessEvents 都会先 计算最近的时间事件发生所需要等待的时间 ,然后调用 aeApiPoll 方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发:

 
  1. numevents = aeApiPoll(eventLoop, tvp);

  2. for (j = 0; j < numevents; j++) {

  3. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

  4. int mask = eventLoop->fired[j].mask;

  5. int fd = eventLoop->fired[j].fd;

  6. int rfired = 0;

  7.  
  8. if (fe->mask & mask & AE_READABLE) {

  9. rfired = 1;

  10. fe->rfileProc(eventLoop,fd,fe->clientData,mask);

  11. }

  12. if (fe->mask & mask & AE_WRITABLE) {

  13. if (!rfired || fe->wfileProc != fe->rfileProc)

  14. fe->wfileProc(eventLoop,fd,fe->clientData,mask);

  15. }

  16. processed++;

  17. }

文件事件如果绑定了对应的读/写事件,就会执行对应的对应的代码,并传入事件循环、文件描述符、数据以及掩码:

 
  1. fe->rfileProc(eventLoop,fd,fe->clientData,mask);

  2. fe->wfileProc(eventLoop,fd,fe->clientData,mask);

其中 rfileProc 和 wfileProc 就是在文件事件被创建时传入的函数指针:

 
  1. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {

  2. aeFileEvent *fe = &eventLoop->events[fd];

  3.  
  4. if (aeApiAddEvent(eventLoop, fd, mask) == -1)

  5. return AE_ERR;

  6. fe->mask |= mask;

  7. if (mask & AE_READABLE) fe->rfileProc = proc;

  8. if (mask & AE_WRITABLE) fe->wfileProc = proc;

  9. fe->clientData = clientData;

  10. if (fd > eventLoop->maxfd)

  11. eventLoop->maxfd = fd;

  12. return AE_OK;

  13. }

需要注意的是,传入的 proc 函数会在对应的 mask 位事件发生时执行。

时间事件

在 Redis 中会发生两种时间事件:

  • 一种是定时事件,每隔一段时间会执行一次;
  • 另一种是非定时事件,只会在某个时间点执行一次;

时间事件的处理在 processTimeEvents 中进行,我们会分三部分分析这个方法的实现:

 
  1. static int processTimeEvents(aeEventLoop *eventLoop) {

  2. int processed = 0;

  3. aeTimeEvent *te, *prev;

  4. long long maxId;

  5. time_t now = time(NULL);

  6.  
  7. if (now < eventLoop->lastTime) {

  8. te = eventLoop->timeEventHead;

  9. while(te) {

  10. te->when_sec = 0;

  11. te = te->next;

  12. }

  13. }

  14. eventLoop->lastTime = now;

由于对系统时间的调整会影响当前时间的获取,进而影响时间事件的执行;如果系统时间先被设置到了未来的时间,又设置成正确的值,这就会导致 时间事件会随机延迟一段时间执行 ,也就是说,时间事件不会按照预期的安排尽早执行,而 eventLoop 中的 lastTime 就是用于检测上述情况的变量:

 
  1. typedef struct aeEventLoop {

  2. ...

  3. time_t lastTime; /* Used to detect system clock skew */

  4. ...

  5. } aeEventLoop;

如果发现了系统时间被改变(小于上次 processTimeEvents 函数执行的开始时间),就会强制所有时间事件尽早执行。

 
  1. prev = NULL;

  2. te = eventLoop->timeEventHead;

  3. maxId = eventLoop->timeEventNextId-1;

  4. while(te) {

  5. long now_sec, now_ms;

  6. long long id;

  7.  
  8. if (te->id == AE_DELETED_EVENT_ID) {

  9. aeTimeEvent *next = te->next;

  10. if (prev == NULL)

  11. eventLoop->timeEventHead = te->next;

  12. else

  13. prev->next = te->next;

  14. if (te->finalizerProc)

  15. te->finalizerProc(eventLoop, te->clientData);

  16. zfree(te);

  17. te = next;

  18. continue;

  19. }

Redis 处理时间事件时,不会在当前循环中直接移除不再需要执行的事件,而是会在当前循环中将时间事件的 id 设置为 AE_DELETED_EVENT_ID ,然后再下一个循环中删除,并执行绑定的 finalizerProc 。

 
  1. aeGetTime(&now_sec, &now_ms);

  2. if (now_sec > te->when_sec ||

  3. (now_sec == te->when_sec && now_ms >= te->when_ms))

  4. {

  5. int retval;

  6.  
  7. id = te->id;

  8. retval = te->timeProc(eventLoop, id, te->clientData);

  9. processed++;

  10. if (retval != AE_NOMORE) {

  11. aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);

  12. } else {

  13. te->id = AE_DELETED_EVENT_ID;

  14. }

  15. }

  16. prev = te;

  17. te = te->next;

  18. }

  19. return processed;

  20. }

在移除不需要执行的时间事件之后,我们就开始通过比较时间来判断是否需要调用 timeProc 函数, timeProc 函数的返回值 retval 为时间事件执行的时间间隔:

  • retval == AE_NOMORE :将时间事件的 id 设置为 AE_DELETED_EVENT_ID ,等待下次 aeProcessEvents 执行时将事件清除;
  • retval != AE_NOMORE :修改当前时间事件的执行时间并重复利用当前的时间事件;

以使用 aeCreateTimeEvent 一个创建的简单时间事件为例:

aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL)

时间事件对应的函数 showThroughput 在每次执行时会返回一个数字,也就是该事件发生的时间间隔:

 
  1. int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {

  2. ...

  3. float dt = (float)(mstime()-config.start)/1000.0;

  4. float rps = (float)config.requests_finished/dt;

  5. printf("%s: %.2f\r", config.title, rps);

  6. fflush(stdout);

  7. return 250; /* every 250ms */

  8. }

这样就不需要重新 malloc 一块相同大小的内存,提高了时间事件处理的性能,并减少了内存的使用量。

我们对 Redis 中对时间事件的处理以流程图的形式简单总结一下:

ae

process-time-event

创建时间事件的方法实现其实非常简单,在这里不想过多分析这个方法,唯一需要注意的就是时间事件的 id 跟数据库中的大多数主键都是递增的:

 
  1. long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,

  2. aeTimeProc *proc, void *clientData,

  3. aeEventFinalizerProc *finalizerProc) {

  4. long long id = eventLoop->timeEventNextId++;

  5. aeTimeEvent *te;

  6.  
  7. te = zmalloc(sizeof(*te));

  8. if (te == NULL) return AE_ERR;

  9. te->id = id;

  10. aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);

  11. te->timeProc = proc;

  12. te->finalizerProc = finalizerProc;

  13. te->clientData = clientData;

  14. te->next = eventLoop->timeEventHead;

  15. eventLoop->timeEventHead = te;

  16. return id;

  17. }

事件的处理

上一章节我们已经从代码的角度对 Redis 中事件的处理有一定的了解,在这里,我想从更高的角度来观察 Redis 对于事件的处理是怎么进行的。

整个 Redis 服务在启动之后会陷入一个巨大的 while 循环,不停地执行 processEvents 方法处理文件事件 fe 和时间事件 te 。

有关 Redis 中的 I/O 多路复用模块可以看这篇文章 Redis 和 I/O 多路复用 。

当文件事件触发时会被标记为 “红色” 交由 processEvents 方法处理,而时间事件的处理都会交给 processTimeEvents 这一子方法:

ae

redis-eventloop-proces-event

在每个事件循环中 Redis 都会先处理文件事件,然后再处理时间事件直到整个循环停止, processEvents 和 processTimeEvents 作为 Redis 中发生事件的消费者,每次都会从“事件池”中拉去待处理的事件进行消费。

文件事件的处理

由于文件事件触发条件较多,并且 OS 底层实现差异性较大,底层的 I/O 多路复用模块使用了 eventLoop->aeFiredEvent 保存对应的文件描述符以及事件,将信息传递给上层进行处理,并抹平了底层实现的差异。

整个 I/O 多路复用模块在事件循环看来就是一个输入事件、输出 aeFiredEvent 数组的一个黑箱:

ae

eventloop-file-event-in-redis

在这个黑箱中,我们使用 aeCreateFileEvent 、 aeDeleteFileEvent 来添加删除需要监听的文件描述符以及事件。

在对应事件发生时,当前单元格会“变色”表示发生了可读(黄色)或可写(绿色)事件,调用 aeApiPoll 时会把对应的文件描述符和事件放入 aeFiredEvent 数组,并在 processEvents 方法中执行事件对应的回调。

时间事件的处理

时间事件的处理相比文件事件就容易多了,每次 processTimeEvents 方法调用时都会对整个 timeEventHead 数组进行遍历:

ae

process-time-events-in-redis

遍历的过程中会将时间的触发时间与当前时间比较,然后执行时间对应的 timeProc ,并根据 timeProc 的返回值修改当前事件的参数,并在下一个循环的遍历中移除不再执行的时间事件。

总结

笔者对于文章中两个模块的展示顺序考虑了比较久的时间,最后还是觉得,目前这样的顺序更易于理解。

Redis 对于事件的处理方式十分精巧,通过传入函数指针以及返回值的方式,将时间事件移除的控制权交给了需要执行的处理器 timeProc ,在 processTimeEvents 设置 aeApiPoll 超时时间也十分巧妙,充分地利用了每一次事件循环,防止过多的无用的空转,并且保证了该方法不会阻塞太长时间。

事件循环的机制并不能时间事件准确地在某一个时间点一定执行,往往会比实际约定处理的时间稍微晚一些