《redis设计与实现》第二部分 (第12章:事件)

it2023-02-12  49

12.0 第12章 事件

分类 文件事件:服务器通过套接字和客户端(或者其他服务器连接),文件事件是服务器对套接字操作的抽象时间事件:redis服务器中的一些操作serverCron需要在给定的时间点执行,时间事件就是服务器对这类定时操作的抽象

12.1 文件事件

基于Redis模式开发了网络事件处理器 文件事件处理器使用I/O多路复用来监听多个套接字,根据套接字目前执行的任务来为套接字关联不同的事件处理器套接字准备好执行连接应答accept、读取read、写入write、关闭close,与操作对应的文件事件就会产生 redis 内部是单线程设计。文件事件处理器也是单线程方式运行,通过使用I/O多路复用程序来监听多个套接字,实现了高性能的网络通信模型,也可以很好的跟redis服务器中的其他以单线程方式运行的模块进行对接。

12.1.1 文件事件处理器

文件事件处理器 套接字(并发出现)I/O多路复用程序(将所有产生事件的套接字都放在一个队列,通过队列,有序同步的每次一个套接字的方式向文件事件分派器传送套接字)文件事件分派器(当上一个套接字产生的事件被处理完毕之后,I/O多路复用程序才会向文件事件分派器传送下一个套接字,根据套接字产生的事件类型,调用相应的事件处理器)事件处理器

12.1.2 I/O多路复用程序

I/O多路复用程序 每个I/O多路复用函数库在Redis源码中都对应一个单独的文件,共四个:ae_epoll.c ae_evport.c ae_kqueue.c ae_select.c,实现了相同的API。程序在编译的过程中会自动选择性能最高的I/O多路复用函数库作为Redis的I/O多路复用程序的底层实现。 所有的异步I/O子系统有不同的内部结构,但在目前的特定情况下,这些具体的异步I/O库用于支持尽可能多的平台 evport =的Solaris 10epoll的 = Linux的kqueue的 = OS X,FreeBSD的选择 =通常安装在所有平台作为fallback Evport, Epoll, and KQueue have O(1) descriptor selection algorithm complexity, and they all use internal kernel space memory structures. Also they can serve lots (hundreds of thousands) file descriptors.Apart the others, select can only serve up to 1024 descriptors, and does full scan of descriptors (so every time it iterates all descriptors to chose one to work with), so the complexity is O(n). //code0: ae.c #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif # #endif

12.1.3 事件类型

I/O多路复用程序可以监听多个套接字ae.h/AE_READABLE和ae.h/AE_WRITABLE事件 套接字变得可读,套接字产生AE_READABLE事件套接字变得可写,套接字产生AE_WRITABLE事件 如果一个套接字可读又可写,那么服务器先读套接字,再写套接字 //ref: https://github.com/chenyahui/AnnotatedCode/blob/master/redis-5.0/src/ae.c //code1: ae.c int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }

12.1.4 API

参考资料来源:https://zhuanlan.zhihu.com/p/92739237原文来源:https://www.cyhone.com/articles/analysis-of-redis-ae/有代码注释的github:https://github.com/chenyahui/AnnotatedCode/blob/master/redis-5.0/src/ae.c
12.1.4.1 事件循环器
aeCreateEventLoop: 通过aeCreateEventLoop来创建一个eventloop。在创建EventLoop的时候,必须指定一个setsize的参数。setsize参数表示了eventloop可以监听的网络事件fd的个数(不包含超时事件),如果当前监听的fd个数超过了setsize,eventloop将不能继续注册。使用连续数组来存储事件信息,创建之后将每一个event的mask属性置为AE_NONE(即是0),mask代表该fd注册了哪些事件。Linux内核会给每个进程维护一个文件描述符表。而POSIX标准对于文件描述符进行了以下约束:fd为0、1、2分别表示标准输入、标准输出和错误输出。当程序刚刚启动时候,创建监听套接字,按照标准规定,该fd的值为3。此时就直接在eventLoop->events下标为3的元素中存放相应event数据。 应用程序要在操作系统上运行,必须满足操作系统为应用程序提供的接口标准,POSIX就是操作系统支持一系列应用程序对外规定的接口标准。在开发应用程序时满足了对应的接口标准,就可以在对应的操作系统上运行。因为Redis利用了fd的这个特点,Redis只能在完全符合POSIX标准的系统中工作。其他的例如某些不符合POSIX标准的Windows系统,生成的fd或者说HANDLE更像是个指针,并不符合POSIX标准。 redis如何指定aeCreateEventLoop的size,在server.c代码中。 maxclients代表用户配置的最大连接数,可在启动时由–maxclients指定,默认为10000。 CONFIG_FDSET_INCR 大小为128,给Redis预留一些安全空间。 //code2 server.c void initServer(void) { //... server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); } //...
12.1.4.2 事件的注册和删除
目前可注册的事件有三种: AE_READABLE 可读事件AE_WRITABLE 可写事件AE_BARRIER 事件的等待和处理(后面有详细的)。 aeCreateFileEvent在epoll的实现中调用了epoll_ctl函数。Redis会根据该事件对应之前的mask是否为AE_NONE,来决定使用EPOLL_CTL_ADD还是EPOLL_CTL_MOD。 可以在ae_epoll.c文件中的aeAPiAddEvent找到 aeDeleteFileEvent也使用了epoll_ctl,Redis判断用户是否是要完全删除该fd上所有事件,来决定使用EPOLL_CTL_DEL还是EPOLL_CTL_MOD。 //code3: ae.c int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } //code4:ae_epoll.c static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
12.1.4.3 事件的等待和处理
处理等待事件的函数 aeMainaeProcessEvents 取出最近一次超时事件计算该超时时间还有多久触发等待网络事件触发或者超时处理触发的各个事件,包括网络事件和超时事件 epoll的超时时间应该设置为最近超时时间的时间间隔,如果在这段时间没有网络事件触发,超时事件也可以正常响应

12.1.5 文件事件的处理器

连接应答处理器(客户端连接服务器:关联套接字的AE_READABLE事件和命令请求处理器) 代码在networking.c/acceptTcpHandler 命令请求处理器(客户端向服务器发命令:套接字产生AE_READABLE事件) 代码在networking.c/readQueryFromClient 命令回复处理器(服务器将执行命令后的命令回复通过套接字返回给客户端:服务器会将客户端套接字的AE_WRITABLE和命令回复处理器关联;客户端准备好接收了就会产生AE_WRITABLE;命令发送完毕,服务器会解除命令回复处理器与套接字AE_WRITABLE之间的关联)example redis服务器工作中,服务器的监听套接字AE_READABLE事件正处于监听状态下,这个事件对应的处理器是连接应答处理器此时有一个Redis客户端向服务器发起连接,监听套接字将会产生AE_READABLE事件,触发连接应答处理器。处理器会对客户端的请求进行应答,然后创建客户端套接字,以及客户端的状态,并将客户端套接字的AE_READABLE事件与命令请求处理器进行关联,客户端就可以向主服务器发送命令请求客户端向主服务器发送一个命令请求,客户端套接字将会产生AE_READABLE事件,引发命令请求处理器执行,处理器读取客户端的内容,然后让相关的程序去执行。执行命令会产生相应的命令回复,为了把命令回复传送给客户端,服务器会将客户端套接字的AE_WRITABLE事件与命令回复处理器进行关联。客户端尝试读取命令回复处理器,客户端套接字将产生AE_WRITABLE事件,触发命令回复处理器执行。当命令回复处理器将命令回复全部写入套接字之后,服务器会解除客户端套接字AE_WRITABLE事件与命令回复处理器之间的关联。

12.2 时间事件

两类 定时事件:在指定时间之后执行一次(当前时间30ms之后执行)周期性事件:每隔一段时间执行一次(每隔30ms执行一次) 组成 id:服务器为时间事件创建的全局唯一ID(标识号)when:long long类型,毫秒精度的unix时间戳,记录时间事件的到达时间timeProc:时间事件处理器aeTimeProc 时间事件处理器返回ae.h/AE_NOMORE,这个事件为定时事件。达到一次之后会被删除(调用finalizerProc)返回整数值,则为周期事件,会对改时间事件的when属性进行更新,让这个事件之后再次到达

12.2.1 具体实现

无序双向链表。最新的超时事件,直接插入到链表的头部。AE要遍历当前时刻的超时事件,会直接暴力从头到尾遍历链表,查看是否有超时事件(确保所有的事件事件都被处理)。

12.2.2 API

aeCreateTimeEvent:创建时间事件 //code5:ae.c aeCreateTimeEvent //新的事件在当前时间的milliseconds毫秒后到达 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; // milliseconds仅仅是一个间隔 // when_sec和when_ms代表事件触发的具体时刻 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; } aeSearchNearestTimer: 返回到达时间距离当前时间最接近的那个时间事件 //code6:ae.c static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; while(te) { if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; } processTimeEvents: 处理时间事件,可以去看ae.c的代码 1.如果事件te标记为删除事件,则直接删除,看te->next,te=te->next2.如果te需要被执行,调用timeProc,根据其返回值,将te标记。继续循环执行1,直到遍历完该链表

12.2.3 serverCron函数(server.c)

按顺序摘录部分代码注释: If we have many clients, we want to call serverCron() with an higher frequency.do a few operations on clients asynchronouslyRecord the max memory used since the server was startedHandle background operations on Redis databasesTrigger an AOF rewrite if neededCleanup expired MIGRATE cached socketsgive priority to RDB savings for replication

12.3 事件的调度和执行

服务器有两种事件:文件事件和时间事件,具体的函数是ae.c/aeProcessEvents时间事件 aeProcessEvents函数调用了aeSearchNearestTimer找到最接近当前时间的时间事件(参考代码,相关的值更新在tvp变量中)aeProcessEvents函数调用了aeApiPoll,设置其最大的阻塞事件由tvp(到达时间最接近当前时间的时间事件决定),可以放置服务器对时间事件频繁的轮询查询,确保aeApiPoll不会阻塞过长时间 文件事件 随机出现,如果处理完一次文件事件没有时间事件到达,那服务器将继续等待处理文件事件,直到慢慢逼近时间事件所设置的到达时间,最终时间到达后,服务器就开始处理时间事件 原则 两个事件的处理同步、有序、原子地执行。服务器不会中途中断事件处理,也不会抢占。文件事件和时间事件都会尽可能减少程序的阻塞事件,有需要时主动让出执行权,降低事件饥饿的可能 比如说写入操作,一次性要写很多超过了预设常量。命令回复处理器会主动用break跳出写入循环,将余下的数据留到下次再写时间事件如果很耗时,比如说持久化rdb和aof,会将其放在子线程或者子进程执行 时间事件在文件之后执行,所以一般实际处理的事件比预设要晚。因为时间事件该执行了,但是文件事件正在进行中,就需要等其结束
最新回复(0)