13.0 客户端
每个与服务器连接的客户端,服务器都为这些客户端建立了相应的Client,这个结构保存了客户端当前的状态信息,以及执行相关功能时所需要用到的数据结构,其中包括:
客户端的套接字描述符客户端的名字客户端的标志值指向客户端正在使用的数据库的指针以及该数据库的号码客户端当前要执行的命令、命令参数、命令参数的个数以及指向命令实现函数的指针客户端的输入缓冲区和输出缓冲区客户端的复制状态信息,以及进行复制所需的数据结构客户端执行BRPOP、BLPOP等列表阻塞命令时使用的数据结构客户端的事务状态,以及执行WATCH命令时用到的数据结构客户端执行发布与订阅功能时用到的数据结构客户端的身份验证标志客户端的创建时间,客户端和服务器最后一次通信时间,以及客户端的输出缓冲区大小超出软件限制时间
//code0 server.h
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuff; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of cufrrent command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core.*/
/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In clientsCronTrackClientsMemUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which categoty the client was, in order to remove it
* before adding it the new value. */
uint64_t client_cron_last_memory_usage;
int client_cron_last_memory_type;
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
redis服务器状态结构的clients结构属性是一个链表,这个链表保存了所有与服务器连接的客户端的状态结构,需要对客户端批量或者查找某一个,都可以通过遍历完成。
//code1: server.h
struct redisServer {
// ...
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
// ...
}
13.1 客户端属性
13.1.0 名字
robj *name;
13.1.1 标志
//code2 server.h
uint64_t flags; /* Client flags: CLIENT_* macros. */
多个标志的二进制或
CLIENT_MASTER:客户端代表是一个主服务器CLIENT_SLAVE:客户端代表是一个从服务器CLIENT_MONITOR:客户端正在执行MONITOR命令CLIENT_MULTI:客户端正在执行的事务CLIENT_BLOCKED:客户端正在被BGPOP、BLPOP等命令阻塞CLIENT_UNBLOCKED:客户端已经从CLIENT_UNBLOCKED标志所表示的阻塞状态中脱离出来,不再阻塞。CLIENT_UNBLOCKED标志只能在CLIENT_BLOCKED标志所表示的阻塞状态中脱离出来CLIENT_DIRTY_CAS:事务使用WATCH命令监视的数据库键已经被修改CLIENT_CLOSE_AFTER_REPLY:用户对这个客户端执行CLIENT KILL命令,或者客户端发送给服务器的命令请求中包含错误的协议内容CLIENT_LUA:用于处理Lua脚本里面包含的Redis命令的伪客户端CLIENT_ASKING:客户端向集群节点(运行在集群模式下的服务器)发送了ASKING的命令CLIENT_CLOSE_ASAP:客户端的输出缓冲区大小超出了服务器允许的范围,服务器会在下一次serverCron函数过程中关闭客户端,避免服务器的稳定性收到这个客户端的影响。积存在输出缓冲区重的所有内容会直接被释放,不会返回给客户端CLIENT_UNIX_SOCKET:服务器使用UNIX套接字连接客户端CLIENT_DIRTY_EXEC:事务在命令入队的时候出现错误。CLIENT_DIRTY_EXEC和CLIENT_DIRTY_CAS两个标志都表示事务的安全性已经被破坏,只要这两个标记中的任意一个被打开,EXEC命令一定会执行失败。这两个标志只能在CLIENT_MULTI打开的情况下使用CLIENT_MASTER_FORCE_REPLY:Queue replies even if is masterCLIENT_FORCE_AOF:强制服务器将当前执行的命令写入AOF文件里面CLIENT_FORCE_REPL:强制主服务器将当前执行的命令复制给所有的从服务器CLIENT_PRE_PSYNC:客户端代表的是一个版本低于Redis2.8的从服务器,主服务器不能使用PSYNC与这个从服务器进行同步
//code3: server.h
#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
#define CLIENT_MASTER (1<<1) /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define CLIENT_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in
server.unblocked_clients */
#define CLIENT_LUA (1<<8) /* This is a non connected client used by Lua */
#define CLIENT_ASKING (1<<9) /* Client issued the ASKING command */
#define CLIENT_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define CLIENT_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define CLIENT_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */
#define CLIENT_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */
#define CLIENT_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */
#define CLIENT_FORCE_REPL (1<<15) /* Force replication of current cmd. */
#define CLIENT_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */
#define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
#define CLIENT_PREVENT_AOF_PROP (1<<19) /* Don't propagate to AOF. */
#define CLIENT_PREVENT_REPL_PROP (1<<20) /* Don't propagate to slaves. */
#define CLIENT_PREVENT_PROP (CLIENT_PREVENT_AOF_PROP|CLIENT_PREVENT_REPL_PROP)
#define CLIENT_PENDING_WRITE (1<<21) /* Client has output to send but a write
handler is yet not installed. */
通常命令:redis只会将那些对数据库进行了修改的命令写入到AOF,并复制到从服务器。如果一个命令没有对数据库进行任何修改,那么这个命令会被认为是只读命令,不会被写入到AOF文件,也不会被复制到从服务器PUBSUB命令:没有修改数据库,但是向频道的订阅者发送消息会带有副作用,接收到消息的客户端状态都会因为这个命令而改变,所以服务器需要用CLIENT_FORCE_AOF将命令写入AOF。未来载入AOF的过程中,服务器就可以再次执行相同的PUBSUB。SCRIPT LOAD命令:虽然没有修改数据库,但是修改了服务器的状态,也是一个带有副作用的命令,所以服务器需要用CLIENT_FORCE_AOF将命令写入AOF。
为了能让主服务器和从服务器都可以正确载入SCRIPT LOAD命令指定的脚本,服务器需要使用CLIENT_FORCE_REPL标志,强制将SCRIPT LOAD命令复制给所有的从服务器
13.1.2 输入缓冲区
可查看code0的代码:sds querybuff;输入缓冲区的大小会根据输入内容动态地缩小或者扩大,但是它最大大小不能超过1GB,否则服务器会关闭这个客户端
13.1.3 命令与命令参数
可查看code0的代码:
//code4 server.h
typedef struct client {
//...
int argc; /* Num of arguments of cufrrent command. */
robj **argv; /* Arguments of current command. */
//...
}
set key value, argc=3, argv[0]=set, argv[1]=key, argv[2]=value
13.1.3 命令的实现函数
服务器根据项argv[0]的值,在命令表中查找命令所对应的命令实现函数,找到之后会将客户端的cmd指向这个结构服务器可以使用redisCommand结构、argc、argv属性中保存的命令参数信息,调用命令实现函数,执行客户端的命令查找命令表的过程中不区分大小写:SET、set、Set、SeT都是一样的结果
//code5 server.h
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
int id; /* Command ID. This is a progressive ID starting from 0 that
is assigned at runtime, and is used in order to check
ACLs. A connection is able to execute a given command if
the user associated to the connection has this command
bit set in the bitmap of allowed commands. */
};
13.1.4 输出缓冲区
执行命令所得到的命令回复会被保存在客户端状态的输出缓冲区,有两个输出缓冲区,一个缓冲区的大小是固定的,另一个缓冲区的大小是可变的
固定缓冲区的大小用于保存长度比较小的回复
bufpos 记录buf数组目前已使用的字节数量PROTO_REPLY_CHUNK_BYTES大小为16kb 可变大小的缓冲区用于保存那些长度比较大的回复【很长的字符串、很多项列表、很多元素的集合】
list *reply,通过链表连接多个字符串对象,服务器可以为客户端保存一个非常长的命令回复,而不用收到固定大小缓冲区16kb的限制
//code6 server.h
#define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
typedef struct client {
//...
/* Response buffer */
list *reply; /* List of reply objects to send to the client. */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
//...
}
13.1.5 身份验证
authenticated=0,表示客户端未通过身份验证。需要服务器启动身份验证功能,如果启动:除了auth命令,客户端发送的其他命令都会被拒绝执行。客户端可以通过auth进行身份验证,如果认证成功,客户端状态authenticated会变成1。输入ping会得到pong的回复authenticated=1,表示客户端通过身份验证
//code7 server.h
typedef struct client {
//...
int authenticated; /* Needed when the default user requires auth. */
//...
}
13.1.6 时间属性
ctime:客户端创建时间,计算客户端和服务器连接了多少秒;client list命令里面的age记录了这个秒数lastinteraction:客户端与服务器最后一次进行互动的时间,这个互动可以是客户端的命令请求,也可以是服务器的命令回复
距离客户端与服务器最后一次进行互动以来,已经过去了多少秒,CLIENT list命令的idle记录了这个秒数 obuf_soft_limit_reached_time:输出缓冲区第一次到达软性限制soft limit的时间
//code8 server.h
typedef struct client {
//...
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
//...
}
13.2 客户端的创建与关闭
13.2.1 普通客户端的创建
通过网络连接与服务器进行连接的普通客户端,客户端使用connect函数连接到服务器时,服务器会调用连接事件处理器,伪客户端创建相应的客户端状态,并将这个新的客户端状态添加到服务器状态结构clients链表的末尾
13.2.1 普通客户端的关闭
多种原因:
客户端进程退出或者被杀死,客户端与服务器之间的网络连接被关闭了,客户端被关闭客户端向服务器发送了不符合协议格式的命令请求,客户端也会被服务器关闭客户端成为client kill命令的目标,它也会被关闭用户为服务器设置了timeout配置选项,当客户端的空转时间超过timeout选项的值,客户端被关闭。也有例外情况,主服务器和从服务器,正在被BLPOP等命令阻塞,或者正在执行SUBSCRIBE、PSUBSCRIBE等订阅命令,即使客户端空转时间超过了timeout的值,客户端也不会被服务器关闭客户端发送的命令请求超过了输入缓冲区的限制大小(默认1GB),客户端被关闭要发送给客户端的命令回复大小超过了输出缓冲区的大小,客户端被关闭
两种模式限制客户端输出缓冲区的大小
硬性限制:输出缓冲区大小超过了硬性限制所设置的大小,服务器关闭客户端软性限制:输出缓冲区超过了软性限制的大小,但是没有超过硬性限制,服务器将使用客户端结构的obuf_soft_limit_reached_time属性记录下客户端到达软性限制的起始时间。之后服务器会一直见识客户端,如果输出缓冲区大小一直超出软性限制,而且持续时间超过了服务器设定的时常,那么服务器将关闭客户端。如果在指定时间内,输出缓冲区的大小不再超过软性限制,客户端就不会关闭,且obuf_soft_limit_reached_time会被清零。 具体配置参考redis.conf: client-output-buffer-limit class hard-limit soft-limit soft-seconds
13.2.2 Lua脚本的伪客户端
服务器在初始化时创建负责执行Lua脚本中包含的Redis命令的伪客户端,并把这个伪客户端关联在服务器状态结构的lua_client属性中。该伪客户端在服务器运行的整个生命期会一直存在,只有服务器被关闭时,这个客户端才会被关闭
//code9 server.h
struct redisServer {
//...
client *lua_client;
//...
}
13.2.3 AOF文件的伪客户端
服务器在载入AOF文件时,会创建用于执行AOF文件包含的Redis命令的伪客户端,并在载入完成后会关闭这个伪客户端