引言


上一篇文章我们分析了Redis对与用户配置的解析,我们这一篇文章会顺着上一篇代码的位置继续分析下去。

​ 等到redis-server启动完毕,离用户最近的就是网络层了,网络层不负责命令的具体执行,只负责网络数据的收发,虽然它不负责具体的功能实现,却是Redis单线程,高性能的核心。

​ 本文只点出一条关键的代码路径,如果想要彻底理解这一部分代码,需要一些Reactor模型,epoll相关的知识,请自行查阅其他资料。

后台运行Redis


​ 这是main中配置解析结束后的第一行代码。

​ 如果你设置了--daemonize,那么Redis就会在后台启动redis.c:4027

    // 将服务器设置为守护进程
    if (server.daemonize) daemonize();

服务器启动


​ C语言网络编程中常见的服务器启动流程就是socket->bind->listen->accept,对于epoll这样的io多路复用,则是socket->bind->listen->set_non_block->register_fd_event->epoll_wait,稍微解释一下这些流程节点的含义:

​ 我们就按照这条路径来追踪服务器启动的代码。

​ 服务器启动的socket->bind->listen->set_non_block->register_fd_eventinitServer方法的调用中完成,redis.c:2050

void initServer() {
    //...
    // socket -> bind -> listen -> set_non_block
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);
    //...
    // ->register_fd_event
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    
    //...
}

​ 因为redis.conf中是允许监听多个ip地址,所以所有要监听的地址会在配置解析时被放到server结构体的bindaddr字段(字符串数组),如果bindaddrNULL则表示绑定当前机器的全部地址:

struct redisServer {
    //...
    char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
    //...
}

​ 在listenToPort中会将所有完成了socket->bind->listen->set_non_block的监听套接字放到bindaddr数组中,之后在一个循环(redis.c:2160)中将事件全部注册到epoll上。注意一下aeCreateFileEvent的第四个参数acceptTcpHandler是一个回调函数,这个回调函数会在事件发生时被调用(即有新的客户端连接请求到达时被调用)。

Redis称这种事件为File Event(从方法名CreateFileEvent可以看出),因为在linux上一切都是文件,所以套接字本身也是文件,所以Redis中的File Event就是指套接字事件。

listenToPort会遍历bindaddr数组,绑定数组中的所有ip地址+server.port

int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
         //...
         // set_non_block
         anetNonBlock(NULL,fds[*count]);
         //...
    }
    //..
}

for循环里面的代码虽然很长,但是逻辑很简单,其实就是判断是NULL,IPv6地址还是IPv4地址,如果是NULL的话,就要绑定本机的全部地址,如果是IPv6地址,则调用anetTcp6Server获取一个IPv6套接字,如果是IPv4的话,则调用anetTcpServer获取一个IPv4套接字。获得套接字后立即就调用anetNonBlock将其设置为非阻塞的(即set_non_block)。anetTcpServer将会返回一个已经完成了bindlisten操作的套接字。anetTcpServer函数其实就直接调用了一下_anetTcpServersocket->bind->listen都是在这个函数里完成的:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    //...
    for (p = servinfo; p != NULL; p = p->ai_next) {
        // socket
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        // bind->listen
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
        goto end;
    }
    //...
}

​ 到此准备工作就完成了,接下来的epoll_wait阶段,服务器就已经正式启动了,翻到main的最后几行,aeMain就启动服务器主循环的函数,redis.c:4079

    aeMain(server.el);

​ 在aeMain方法中可以看到明显的循环:

/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

​ 进入aeProcessEvents,里面代码很长,其中的aeApiPoll函数调用会阻塞在IO多路复用上(即epoll_wait),ae.c:560

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    //...
    numevents = aeApiPoll(eventLoop, tvp);
    //...
}

aeApiPoll调用完后,在eventLoopfired字段上就本轮触发的所有事件。

文件事件


​ 之前解释过Redis将所有的套接字事件都称为文件事件。

​ 刚刚提到创建文件事件的函数是aeCreateFileEvent函数,它会创建一个aeFileEvent结构体,并以fd为下标,将其放置在eventLoopevents字段(是一个aeFileEvent数组),ae.c:181

    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];

​ 其实这里本质上就是构建一个fdaeFileEvent的映射,之后在某个fd的事件触发时,方面通过fdaeFileEvent结构体取出来,然后调用里面的回调函数,从后面的代码中也可以看出aeFileEvent中存储着回调函数,ae.c:189

    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

高级语言写得比较多的程序员可能会对fd为什么能够用来做下标感到疑惑,其实对于一个进程来说,fd是从3开始递增的,0,1,2分别代表stdin, stdout和stderr。所以虽然会损失三个元素的空间,但是对于性能却是能得到不少提升的。

​ 当然,aeCreateFileEvent最重要的还是将事件在epoll上注册,ae.c:184:

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

​ 然后回调函数会在aeApiPoll返回之后,检查eventLoop.fired中的fd和事件,通过fd取出相应的aeFileEvent结构体来调用相应的回调函数,[ae.c:561]:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    //...
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
        // 从已就绪数组中获取事件
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        //...
        // 读事件
        if (fe->mask & mask & AE_READABLE) {
            // rfired 确保读/写事件只能执行其中一个
            rfired = 1;
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        }
        // 写事件
        if (fe->mask & mask & AE_WRITABLE) {
            if (!rfired || fe->wfileProc != fe->rfileProc)
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        }
        //...
    }
}

定时事件


​ 除了文件事件以外,Redis还有一类事件是定时事件,其实定时事件总共就只有一个,就是在initServer时注册的serverCronredis.c:2151

    // 为 serverCron() 创建时间事件   新建了一个aeTimeEvent并插入了evetloop中的时间事件列表
    // 1 表示在 1ms 后执行,这个 1ms 定义的只是 serverCron 的初始执行时间,而不是执行间隔
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        redisPanic("Can't create the serverCron time event.");
        exit(1);
    }

​ 跳转到aeCreateTimeEvent,发现它做的事情就就是创建一个aeFileEvent结构体并将其插入到eventLoop的定时事件列表(即aeEventLooptimeEventHead字段)中。并没有任何epoll相关的操作,而且epoll其实也并不支持定时器的功能,那Redis是怎么实现定时任务的呢?其实秘密就在巧妙地设置epoll的超时时间上,aeApiPoll的第二个参数就是超时时间,在aeProcessEvents里调用aeApiPoll之前干的事情,就是计算距离现在最近的一次定时事件的时间,并以这个时间作为超时时间,ae.c:512

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;  // 和最近一次定时事件的间隔
        //...
    }
    numevents = aeApiPoll(eventLoop, tvp);
    //...
}

serverCron 的执行频率是由 redisServer.hz 来决定的,默认值为 10,也就是每秒执行 10 次,即每隔 100 ms 执行一次。在前面代码分析中我们看到了创建 serverCron 定时事件的代码aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) 给的定时时间似乎是 1ms,其实这是初始时间,真正的间隔时间是由时间执行函数的返回值决定的,看一看 serverCron 的返回值,你就能找到它真正的定时时间了,redis.c:1562

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //...
    return 1000/server.hz;
}

创建新连接


​ 其处理逻辑就是当时在监听套接字上注册的回调函数acceptTcpHandler

​ 每当有新的客户端连接到来时,都会分发相关事件而触发该函数。

​ 该函数会先accept这条连接,得到新连接的fd,创建一个redisClient结构体代表和这条连接相关的状态,最后注册该套接字的File Event:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;//1000

    while(max--) {  // accept 多次防止同时过来多条连接
        // accept 客户端连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        // 为客户端创建客户端状态(redisClient)
        acceptCommonHandler(cfd,0);
    }
}

这里需要accept这么多次,是因为即使同时有多条连接过来,监听套接字注册的事件也只会激活一次,所以要多次accept,知道出现EWOULDBLOCK,防止有连接漏掉。

acceptCommonHandler中最重要的调用createClient方法,networking.c:752

static void acceptCommonHandler(int fd, int flags) {

    // 创建客户端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        //...
    }
    //...

createClient中先将fd设置为非阻塞,之后就在eventLoop注册了该套接字的File Event,注意这里注册的回调函数readQueryFromClient,它就是之后从客户端读取数据的函数,最后将其加入了redisServer的clients列表中:

redisClient *createClient(int fd) {
    if (fd != -1) {
        // 设置为非阻塞
        anetNonBlock(NULL,fd);
        // 禁用 Nagle 算法, 降低延迟
        anetEnableTcpNoDelay(NULL,fd);
        // 设置 keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 注册该套接字的的File event
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            //...
        }
    }
    //...
    //加入RedisServer的客户端列表中
    if (fd != -1) listAddNodeTail(server.clients,c);
    //...
}

读取客户端数据


​ 上一节提到,这里其实就是readQueryFromClient函数。

​ 读过上面的内容,大概也能猜出这个函数在做什么事情了,首先将传输来的数据读到代表该条连接的redisClient的缓存中querybuf字段中(每次最多读16kB),然后从中解析出命令名称,通过命令名称从之前的server.commands字典中取出redisCommand,然后执行里面的函数。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    //...
    // 读入内容到缓存
    nread = read(fd, c->querybuf+qblen, readlen);
    //...
    // 处理命令
    processInputBuffer(c);
    //..
}

processInputBuffer中会尽可能地读取querybuf字段,并将它解析成字符串数组放到redisClientargv数组中:

void processInputBuffer(redisClient *c) {
    while(sdslen(c->querybuf)) { // 尽可能读取querybuf
        // 将缓冲区中的内容转换成命令,以及命令参数 放到c->argv属性中
        // 因为通过telnet和通过客户端连接,命令格式不同,所以这里需要两种解析
        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } 
        
        if (c->argc == 0) {
            //...
        } else {
            // 执行命令,并重置客户端
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

processCommand会通过命令名称(c->argv[0])取到redisCommand结构体并执行:

int processCommand(redisClient *c) {
    //...
    // lookupCommand其实就是去server.commands字典中去找command   L2550
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    
    
    //...
    if {
    //...
    } else {
        // 执行命令  L2766
        call(c,REDIS_CALL_FULL);
    }
} 

向客户端返回数据


​ 在后面实现各个Redis命令的时候,如果需要向客户端写数据,一般都是调用的addReply方法,addReply先在创建了一个写文件事件,然后将要发送的数据写入到该条连接(其实就是redisClient结构体)的写缓存中:

void addReply(redisClient *c, robj *obj) {

    // 创建写文件事件
    if (prepareClientToWrite(c) != REDIS_OK) return;
     
    // 将要发送的数据写入写缓存中
    if (sdsEncodedObject(obj)) {
        //...
    } else if (obj->encoding == REDIS_ENCODING_INT) {
        //...
    }
}

​ 进入prepareClientToWrite方法,你将看到熟悉的aeCreateFileEvent,创建了一个写文件事件,并且回调方法是sendReplyToClient,由这个回调方法负责最终将数据发送给客户端:

int prepareClientToWrite(redisClient *c) {

    //...

    // 创建写文件事件
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;

    return REDIS_OK;
}

​ 对于redisClient,它有两个字段是用来存放写缓冲的,一个buf(就是一个16KB的缓冲),还有一个reply链表,当buf满的时候,会先将回复内容链接在reply后面。

End


下一章将讲解Redis具体功能数据结构的实现。