上篇文章介绍了sds的结构,和sds的使用方法,这章我们在回到读取io数据的地方来看,redis是如何从io 读取数据最后转化成执行命令的过程。 本篇文章需要先熟悉前面两篇文章,没看的同学需要退回看一下。
redis系列,redis网络,你得知道的一些事.
在网络这个章节我们知道,我们知道通过把客户端对应fd注册到epoll,当有数据可读的时候最后会调用到以下这个方法
//这里开始看怎么从client 客户端把参数读取出来 void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ // 延迟读,让主线程有更多空间处理别的事情,然后io 线程来帮助读取 if (postponeClientRead(c)) return; /* Update total number of reads on server */ //统计处理读的数目 server.stat_total_reads_processed++; // 给每次读设置一个上限默认是 (1024*16) // 每次从io 最多读取16kB的数据 readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ //multi bulk request 跟inline buffer不同是 //inline process 不支持字符串里包含空格符号和回车符号的情况 //所以一般客户端协议不会使用inline这种格式 // 而是选用multbulk这种处理方式 // 但是我们这里只讲命令的传输过程 // 我们先用简单的方式来看下命令的执行过程 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } //获取当前querybuf的长度 qblen = sdslen(c->querybuf); //应该是一个统计querybuf 的峰值长度,如果小于则更新 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; //为querybuf 分配更多的长度 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //这个部分是客户端连接里面读取到数据,读入到querybuf,然后返回长度 //qblen 代表querybuf已用部分,所以这里+qblen 就是把指针指向 内存还未写入的地方 // c-conn 是struct connection 里面会有客户端的fd nread = connRead(c->conn, c->querybuf+qblen, readlen); //这里就是看状态,通过nread 变量来决定后续处理 // -1的时候说明连接状态出了问题 if (nread == -1) { //这里判断连接是否未就绪 if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { //回收client对象 serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { //nread==0 表示client 已经关闭 serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { //这里的意思是如果client 是master 同步过来的数据, // 那么将数据复制到pending_querybuf 里面,当最后一个命令被执行完 //这个copy string 会被用到,但这里不是我们的主线暂时埋下一个坑 /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } //增加query buffer的长度 //这里主要扩容,上篇文章sds 有讲到过 sdsIncrLen(c->querybuf,nread); //更新最近迭代时间用于超时处理 //这个值应该是用于后续命令超时会用到 //到这里基本上io数据已经读取到主线程内存里面来了 c->lastinteraction = server.unixtime; //如果客户端是master if (c->flags & CLIENT_MASTER) c->read_reploff += nread; //更新已经读取的长度 server.stat_net_input_bytes += nread; //这里会限制客户端连接buf的长度 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { //ci 和byte 都是用于打印客户端的信息,就是当query buffer 大于1g的时候 //总共超过1g的时候,会进入强制进入回收流程,停止处理 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); //下面这个方法都是为了打印参数做了一些截断转义处理。 bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); //打印完就释放内存 sdsfree(ci); sdsfree(bytes); // 释放客户端信息 freeClientAsync(c); return; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ //tcp 并不能保证一次性把报文全部传递到服务端 //也就是说现在读到的buffer 不一定是一个完整的命令 processInputBuffer(c); }上面代码主要就是对sds的方法使用,以及各种边界条件的判断,其中可以看到主流程就是redis 把数据从io里面读取到client->querybuf 里面。为下一步解析命令在做准备。
还有就是我们可以看到上文提到了两种解析协议一种是inline,一种是multibulk,两种协议也做了说明,inline 主要是像redis-cli 这种做简单的调试和处理,而multibulk主要针对于其客户度支持参数的多样性,multibulk多个一些占位符替换的方式,让原本不支持的空格符号和回车符号,也能在redis里面正确表达出来。但在这里我们不对这个协议单独分析,先从最简单inline串通整个命令的执行过程
现在我们得到一个字符串,但这个结构并不是一个好执行命令的结构,所以我们继续要转化成一个好执行命令的结构 我们继续分析代码: network.c
/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ // qb_pos 表示客户端buffer 已经读取到的位置, // 如果已读的pos比query buffer(客户端传过来的buf)总长度要小,那么执行下面流程。 // 下面条件成立表示还有数据需要处理 while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ //如果客户端已经暂停则跳出,等待下一次执行, // 但是一种例外是client来自于从服务器 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; /* Immediately abort if the client is in the middle of something. */ // 退出当客户端被其它情况阻塞住 比如rdb if (c->flags & CLIENT_BLOCKED) break; /* Don't process more buffers from clients that have already pending * commands to execute in c->argv. */ // 客户端等待执行命令阶段 if (c->flags & CLIENT_PENDING_COMMAND) break; /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */ //slave服务器繁忙时也会暂时不处理从master收到的buffer if (server.lua_timedout && c->flags & CLIENT_MASTER) break; /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; /* Determine request type when unknown. */ // req type 为空 if (!c->reqtype) { //判断是否是一个批处理操作 if (c->querybuf[c->qb_pos] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { // inline请求 c->reqtype = PROTO_REQ_INLINE; } } if (c->reqtype == PROTO_REQ_INLINE) { //这个地方是把从网络io读到的报文转化为robj 的指针数组 if (processInlineBuffer(c) != C_OK) break; /* If the Gopher mode and we got zero or one argument, process * the request in Gopher mode. */ //gopher 协议 if (server.gopher_enabled && ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') || c->argc == 0)) { processGopherRequest(c); resetClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; break; } //这里批处理命令的的地方转化参数 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ // 这里就是根据状态,让多线程止步于此。 if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* We are finally ready to execute the command. */ //如果处理为error直接跳过 //这里开始是执行命令的地方。 if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */ return; } } } /* Trim to pos */ // if (c->qb_pos) { //将处理完的sds 在这里处理调,留下未处理完的String sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } } //从这里开始把buffer 转化为参数,但是有可能现在还不能构成一个完整的命令,那么就会return error, // 等待下一次执行。 int processInlineBuffer(client *c) { char *newline; int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; /* Search for end of line */ //从上次结束的地方开始读,直到第一个回车符号结束 //如果没有匹配到回车符号,则返回null,newline 是指向回车符号这个地址 newline = strchr(c->querybuf+c->qb_pos,'\n'); /* Nothing to do without a \r\n */ if (newline == NULL) { //未读的数据不能超过1024*64 个字节, 即一行命令不能超过64kb, if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big inline request"); setProtocolError("too big inline request",c); } return C_ERR; } /* Handle the \r\n case. */ //处理不同的系统可能回车是\r\n的情况 if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') newline--, linefeed_chars++; /* Split the input buffer up to the \r\n */ //这里返回的是新读的string 的长度。 querylen = newline-(c->querybuf+c->qb_pos); //生成一个新的sds ,然后value 就是我们新读的这一行命令 aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); //将参数分解出来 //变成了一个指针数组 argv = sdssplitargs(aux,&argc); //释放aux 空间 sdsfree(aux); if (argv == NULL) { addReplyError(c,"Protocol error: unbalanced quotes in request"); setProtocolError("unbalanced quotes in inline request",c); return C_ERR; } /* Newline from slaves can be used to refresh the last ACK time. * This is useful for a slave to ping back while loading a big * RDB file. */ //这里跟主服务器和从服务器同步数据有关系,我们到时还会回到这里继续分析,埋个点 if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE) c->repl_ack_time = server.unixtime; /* Masters should never send us inline protocol to run actual * commands. If this happens, it is likely due to a bug in Redis where * we got some desynchronization in the protocol, for example * beause of a PSYNC gone bad. * * However the is an exception: masters may send us just a newline * to keep the connection active. */ // 假如本服务器是slaver, slaver 是不会收到master inline格式的命令,只有一种情况就是保持连接活跃 // 这个slave 会重新去连master,然后丢弃到现有的连接 if (querylen != 0 && c->flags & CLIENT_MASTER) { serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master."); setProtocolError("Master using the inline protocol. Desync?",c); return C_ERR; } /* Move querybuffer position to the next query in the buffer. */ //更新已读的字符串包括一些/r的一些情况 c->qb_pos += querylen+linefeed_chars; /* Setup argv array on client structure */ if (argc) { //如果argv 为空则释放空间 if (c->argv) zfree(c->argv); //开始分配内存空间 c->argv = zmalloc(sizeof(robj*)*argc); } /* Create redis objects for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { //所有的参数都转化为robj 这种结构,包括命令行 c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); c->argc++; } //回收空间 zfree(argv); return C_OK; } robj *createObject(int type, void *ptr) { //分配空间 robj *o = zmalloc(sizeof(*o)); //类型 1,String 类型, 2, set 类型 ,3, sorted set 类型, 4, 字典类型 o->type = type; // ecoding 有10个类型,后面将11类型,这里指的就是一个普通的string 类型来用 o->encoding = OBJ_ENCODING_RAW; // 指针指向数据 o->ptr = ptr; //引用初始化1 o->refcount = 1; /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ //根据策略放入lru的时间戳或者lfu类型的时间戳 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; } else { o->lru = LRU_CLOCK(); } return o; } typedef struct redisObject { //类型 1,String 类型, 2, set 类型 ,3, sorted set 类型, 4, 字典类型 ,‘ //一个只有4位的非负整数 unsigned type:4; //encoding 有10个类型,具体见下面介绍, //后面会展开分析为什么要分type 和encoding unsigned encoding:4; //这个是24位的字段,选择lru 和lfu 的时候表达含义也不同,具体在lru和lfu 章节再回过头揭秘 unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ //这个应该引用计数器 ,具体我们会回过头来再讨论 int refcount; //指向具体的数据 void *ptr; } robj; //原生string类型 #define OBJ_ENCODING_RAW 0 /* Raw representation */ //整型 #define OBJ_ENCODING_INT 1 /* Encoded as integer */ //hash #define OBJ_ENCODING_HT 2 /* Encoded as hash table */ //用ziplist 的map #define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ // 链表类型已经没有被用到 #define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */ // zip list #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ // 整型 set #define OBJ_ENCODING_INTSET 6 /* Encoded as intset */ // 跳表 #define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ // 压缩的sds类型 #define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ // 链表类型的ziplist #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ // stream 类型 #define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */上面我们的源代码从一个raw sds 类型 最终转换成了我们redis object 类型,然后稍微提及了下redis object , 为我们后面讲数据结构会有所铺垫。
图解数据结构转变过程:
思考下为什么redis 作者不从阶段1直接解析到阶段3了,
因为从一个sds 字符串,转换成一个sds数组,其归属方法可以放在sds的里面的split的方法。这里就可以直接复用方法第二个阶段也只要把单个sds,转换成redisObject结构即可。这样整体上是不耦合的。 这里可以认为是职责分类,这样在代码处理方便也更为清晰一点,而且从性能来说只是临时多分配了一些内存,也不会影响太大。在进入执行命令的代码前,我们得首先来看下命令的结构,和命令的初始化。 server.h
struct redisCommand { //命令名字 char *name; //命令处理的process redisCommandProc *proc; //对应命令arg的个数,正数表示查询,增加,修改等操作,负数的时候表示删除操作 int arity; // 这个可以看做作这个命令的标签, // 每个标签通过空格进行分隔, 比如set 命令 , // 这个值为 "write use-memory @string", // 表示这是一个写入,需要用到内存,格式为string的命令 char *sflags; /* Flags as string representation, one char per flag. */ // 这个通过二进制的表现方式来表现出sflag的值 uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ // 这个方法在redis cluster, 当不属于自己的键值请求到slot, // 怎么去重定向的方法。 redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ // 表示第几个参数是 first key, 0 表示没有key int firstkey; /* The first argument that's a key (0 = no keys) */ // 表示最后一个key是第几个参数,-1 的时候表示可以有n个key ,比如del命令 // 1 表示firstkey 和lastkey 是同一个 int lastkey; /* The last argument that's a key */ // 这个key step ,last key 我觉得要翻译成相邻key的距离 int keystep; /* The step between first and last key */ // 这个应该是用于统计这个命令执行的时长,calls 是执行次数。 long long microseconds, calls; //这个command 的id ,是一个自增的数字 int id; /* Command ID. This is a progressive ID starting from 0 that is assigned at runtime, and is used in order to check ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */ };redisCommandTable 在java 里面类似一个枚举类,里面对redis命令进行了一些初始化的设值,下面省略了比较多的命令代码,具体的常用命令我们再接下来的文章再分析。 server.c
struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2, "admin no-script", 0,NULL,0,0,0,0,0,0}, {"get",getCommand,2, "read-only fast @string", 0,NULL,1,1,1,0,0,0}, ........下面这是命令行的初始化,用字典的形势存着命令结构,好让我们去通过客户端传过来的参数来进行匹配 server.c
void initServerConfig(void) { ...... // 这个type 是什么我会在字典环节做讲解,你可以认为,redis作者对不同类型 // 的字典给别的地方留下的接口 // 已方便个性化的处理 server.commands = dictCreate(&commandTableDictType,NULL); server.orig_commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); ...... void populateCommandTable(void) { int j; int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); for (j = 0; j < numcommands; j++) { //循环遍历枚举table struct redisCommand *c = redisCommandTable+j; int retval1, retval2; /* Translate the command string flags description into an actual * set of flags. */ // 将sflags 的标签用二进制表示出来 if (populateCommandTableParseFlags(c,c->sflags) == C_ERR) serverPanic("Unsupported command flag"); // 这里是跟权限相关 // 后续在权限环节我们继续分析 c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */ // 用字典建立了映射关系 retval1 = dictAdd(server.commands, sdsnew(c->name), c); /* Populate an additional dictionary that will be unaffected * by rename-command statements in redis.conf. */ //这里应该是支持一些别名的操作 //不过别名操作是推荐不使用的 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); } }有了以上的基础后我们继续接着看命令是如何执行的环节
network.c
/* This function calls processCommand(), but also performs a few sub tasks * for the client that are useful in that context: * * 1. It sets the current client to the client 'c'. * 2. calls commandProcessed() if the command was handled. * * The function returns C_ERR in case the client was freed as a side effect * of processing the command, otherwise C_OK is returned. */ int processCommandAndResetClient(client *c) { int deadclient = 0; //因为是单线程所以通过赋值给current_client 能够知道当前正在处理的client server.current_client = c; //processCommand 是执行命令的主要方法, // 所有执行命令的入口都会调用它 if (processCommand(c) == C_OK) { // 这里是命令执行完的后续操作 // 这里会涉及到比较多的主从同步的知识,后续会继续从主从同步环节来讲 commandProcessed(c); } //如果client 被释放 就会出现deadclient if (server.current_client == NULL) deadclient = 1; server.current_client = NULL; /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ return deadclient ? C_ERR : C_OK; }处理命令主要在processCommand 这个方法 server.c
/* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ // int processCommand(client *c) { //这里是一些加载的模块的filter ,暂时先跳过这里 moduleCallCommandFilters(c); /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ //第一个参数一般就是命令关键字 //quit 关键字比较特殊,会进入检查同步机制,造成一些问题, // 但这里注释也没说会出现啥问题, // 后面会回过头来再继续分析等整个 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ //转换成命令的结构 //这里转换命令的地方 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { //这里就是当找不到对应的命令 //返回错误信息给上一级处理 sds args = sdsempty(); int i; for (i=1; i < c->argc && sdslen(args) < 128; i++) args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s", (char*)c->argv[0]->ptr, args); sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { //参数个数不匹配 rejectCommandFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } /* "write" flag */ //是否是一个写操作 int is_write_command = (c->cmd->flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); /* "use-memory" flag */ //是否拒绝用到内存 int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM)); // 表示这个命令是能访问到脏数据的 int is_denystale_command = !(c->cmd->flags & CMD_STALE) || (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE)); // 是否是拒绝在redis loading 状态执行的命令 int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) || (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING)); /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ // 权限相关的后续再讨论 int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || (DefaultUser->flags & USER_FLAG_DISABLED)) && !c->authenticated; if (auth_required) { /* AUTH and HELLO and no auth modules are valid even in * non-authenticated state. */ if (!(c->cmd->flags & CMD_NO_AUTH)) { rejectCommand(c,shared.noautherr); return C_OK; } } /* Check if the user can run this command according to the current * ACLs. */ //权限相关的逻辑 int acl_keypos; int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { addACLLogEntry(c,acl_retval,acl_keypos,NULL); if (acl_retval == ACL_DENIED_CMD) rejectCommandFormat(c, "-NOPERM this user has no permissions to run " "the '%s' command or its subcommand", c->cmd->name); else rejectCommandFormat(c, "-NOPERM this user has no permissions to access " "one of the keys used as arguments"); return C_OK; } /* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ //cluster 相关 if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, &hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout * condition, to avoid mixing the propagation of scripts with the * propagation of DELs due to eviction. */ //这里会检查内存溢出的时候的操作,继续埋坑, // 每次再执行命令前我们都会判断是否有足够的内存。 // 这里也是执行lfu,lru的入口 if (server.maxmemory && !server.lua_timedout) { int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; int reject_cmd_on_oom = is_denyoom_command; /* If client is in MULTI/EXEC context, queuing may consume an unlimited * amount of memory, so we want to stop that. * However, we never want to reject DISCARD, or even EXEC (unless it * contains denied commands, in which case is_denyoom_command is already * set. */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand) { reject_cmd_on_oom = 1; } if (out_of_memory && reject_cmd_on_oom) { rejectCommand(c, shared.oomerr); return C_OK; } /* Save out_of_memory result at script start, otherwise if we check OOM * untill first write within script, memory used by lua stack and * arguments might interfere. */ if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) { server.lua_oom = out_of_memory; } } /* Make sure to use a reasonable amount of memory for client side * caching metadata. */ // 这里应该是跟客户端缓存的关系,后续再来看这一块 if (server.tracking_clients) trackingLimitUsedSlots(); /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ // 如果写入文件出现问题,比如rdb 错误 aof 错误都会阻止写入命令的操作 int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL && (is_write_command ||c->cmd->proc == pingCommand)) { if (deny_write_type == DISK_ERROR_TYPE_RDB) rejectCommand(c, shared.bgsaveerr); else rejectCommandFormat(c, "-MISCONF Errors writing to the AOF file: %s", strerror(server.aof_last_write_errno)); return C_OK; } /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ //这里可以到从服务没准备好是不能执行写入类型的命令 if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && is_write_command && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { rejectCommand(c, shared.noreplicaserr); return C_OK; } /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ // 如果从服务器是一个只读slave,那么写入命令就不能被执行 if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && is_write_command) { rejectCommand(c, shared.roslaveerr); return C_OK; } /* Only allow a subset of commands in the context of Pub/Sub if the * connection is in RESP2 mode. With RESP3 there are no limits. */ // client 处于pub sub 状态。那就只能执行pub sub相关的命令 if ((c->flags & CLIENT_PUBSUB && c->resp == 2) && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { rejectCommandFormat(c, "Can't execute '%s': only (P)SUBSCRIBE / " "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context", c->cmd->name); return C_OK; } /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, * when slave-serve-stale-data is no and we are a slave with a broken * link with master. */ // 如果是从服务器的话,且与主服务器断连状态,且命令类型(不属于脏数据允许) // 如info 这种类型命令就可以执行,set 就不允许执行 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && is_denystale_command) { rejectCommand(c, shared.masterdownerr); return C_OK; } /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ // 当db处于loadding 状态 ,只有特定命令能执行 if (server.loading && is_denyloading_command) { rejectCommand(c, shared.loadingerr); return C_OK; } /* Lua script too slow? Only allow a limited number of commands. * Note that we need to allow the transactions commands, otherwise clients * sending a transaction with pipelining without error checking, may have * the MULTI plus a few initial commands refused, then the timeout * condition resolves, and the bottom-half of the transaction gets * executed, see Github PR #7022. */ //执行lua script 太慢的情况下,只允许以下几种命令操作。 if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && c->cmd->proc != multiCommand && c->cmd->proc != discardCommand && c->cmd->proc != watchCommand && c->cmd->proc != unwatchCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { rejectCommand(c, shared.slowscripterr); return C_OK; } /* Exec the command */ //执行了multi 命令那么 client状态就变成了 //client multi, 这个除开exec,discard,multi,watch这个命令 //剩下的命令都会放入queue if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { //在这里执行multi操作 queueMultiCommand(c); addReply(c,shared.queued); } else { //这里是整个执行命令地方的逻辑 call(c,CMD_CALL_FULL); // 这里是和同步数据有关系 c->woff = server.master_repl_offset; //BLPOP 相关操作会执行的事情 if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; } //call 方法是整个执行方法的核心方法,我们后续会回过头继续讲解。 void call(client *c, int flags) { long long dirty; ustime_t start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; server.fixed_time_expire++; /* Send the command to clients in MONITOR mode if applicable. * Administrative commands are considered too dangerous to be shown. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); } /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); redisOpArray prev_also_propagate = server.also_propagate; redisOpArrayInit(&server.also_propagate); /* Call the command. */ dirty = server.dirty; updateCachedTime(0); start = server.ustime; // 这里会调用每个命令的process c->cmd->proc(c); .......可以看到不同的命令在服务不同状态下,其能否执行都会有不同的判断,我们看到命令对应的标签在这里被使用到了。当然这里展开有非常多的分支。要想看懂这边全部的代码,我们需要从一些常用的命令开始分析。
这篇文章主要给出了从io读取到buffer之后到如何执行命令的逻辑,但是还有一些细节没有完全诠释,因为本篇博文主要的目的是io处理的后续,到执行方法的一个过程。其具体执行逻辑我们会在后续的文章在每个模块细讲。 我们从这篇博文能看到redis 作者对于模版模式,命令模式的完美理解,使得代码非常有层次,也便于后续的一些扩展。 此篇文章主要是为后续各种命令操作做铺垫,以及主从同步里面涉及到的逻辑,都会反复会到这里来验证。
