引言


Redis 的主从复制功能虽然经常使用,但是心中一直很多困惑。比如能否保证数据的一致性,有没有可能在从服务器上读到过期的数据?从服务器能像主服务器一样保证事务吗?

读完源码后可以很容易地解答上面的问题,我将在文章的最后,列出一些常见的问题,并且从源码角度进行简单解答。

大体上的流程


先理解大体上的流程,会更加容易理解接下来的细节。

从服务器在刚刚成为 Slave 时,会先和 Master 器进行一次大规模的同步,就是直接把主服务器 rdb 文件给拷贝过来,然后加载到自己内存里面。

之后如果 Master 再次接收到更新数据的命令,则会转发一份命令到 Slave(称为“命令传播”)。

如果 Slave 应为各种原因掉线了,然后重新上线,只要中间漏掉的命令不是太多,就可以增量同步中间漏掉的命令,再次达到同步状态。

如下图:

主从复制大体流程

Slave视角


SLAVEOF

使用 SLAVEOF ip port 命令可以把一个 Redis 服务器变成另一个 Redis 服务器的从,所以我们从这里入手研究从服务器的逻辑。replication.c:1694

void slaveofCommand(redisClient *c) {
    //...
    replicationSetMaster(c->argv[1]->ptr, port);
    //...
}

但是仔细研究后发现,除了设置一些属性之外什么都没有干,看起来比较关键的一句是它将代表从服务器当前复制状态的 server.repl_state 设置成了 REDIS_REPL_CONNECTreplication.c:1621

void replicationSetMaster(char *ip, int port) {
    //....
    // 进入连接状态(重点)
    server.repl_state = REDIS_REPL_CONNECT;
    //...
}

既然在 SLAVEOF 命令的逻辑里没有作连接主服务器的操作,那么连接是在什么时候做的呢?

状态转换

其实从服务器是采用状态机的方式来实现的,server.repl_state 就代表当前状态机的状态,redis.h:1182

struct redisServer {
    //...
    // 复制的状态(服务器是从服务器时使用)
    int repl_state;          /* Replication status if the instance is a slave */
    //...
}

有以下几种状态,redis.h:263

// REPL 是 replication 的缩写
// 初始状态
#define REDIS_REPL_NONE 0 /* No active replication */
// 准备连接主服务器
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
// 正在连接主服务器
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
// 已经给主服务器发送了 PING,正在等待 PONG
#define REDIS_REPL_RECEIVE_PONG 3 /* Wait for PING reply */
// 正在和主服务器
#define REDIS_REPL_TRANSFER 4 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 5 /* Connected to master */

在每隔 1s 执行一次的 replicationCron 函数中会根据当前状态进行相应的动作,比如当它发现当前状态是 REDIS_REPL_CONNECT,就会发起对主服务器的连接,replication.c:2225

// replicationCron 中主要就是和从服务器状态转换相关的逻辑
void replicationCron(void) {
    //...
    
    // 尝试连接主服务器
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    
    //...
}

各个状态之间的转换关系如下:

slave 状态转移

之前已经提到了 SLAVEOF 命令会导致状态转移至 REDIS_REPL_CONNECT,下面我们从这个状态开始,逐一研究状态间转移的关键代码。

CONNECT -> CONNECTING

进入刚刚呈现的代码中的 connectWithMaster 函数就可以看到该状态相关的逻辑,大概是异步创建一个到主服务器的连接,并且注册一个连接成功时的事件处理器,然后将状态改为 REDIS_REPL_CONNECTING

// 以非阻塞方式连接主服务器
// CONNECT -> CONNECTING
int connectWithMaster(void) {
    int fd;

    // 连接主服务器
    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    //...

    // 监听主服务器 fd 的读和写事件,并绑定文件事件处理器
    // syncWithMaster 就是处理器
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        //...
    }

    // ...
    // 将状态改为已连接
    server.repl_state = REDIS_REPL_CONNECTING;

    return REDIS_OK;
}

CONNECTING -> RECEIVE_PONG

连接建立成功后,上面注册的 syncWithMaster 处理器就会被调用,然后将状态更新后,发送一个 PING 命令给主服务器,以确保主服务器在线,replication.c:1358

// 从服务器用于同步主服务器的回调函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    if (server.repl_state == REDIS_REPL_CONNECTING) {
         // 更新状态
        server.repl_state = REDIS_REPL_RECEIVE_PONG;
         // 同步发送 PING
        syncWrite(fd,"PING\r\n",6,100);
    }
     // 同步接收 PONG 命令
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        //...
        if (syncReadLine(fd,buf,sizeof(buf),
            server.repl_syncio_timeout*1000) == -1)
        {
            //...
        }
        //...
    }
    //...
}

之后如果发现如果需要身份验证的话会先进行身份验证,然后通过 REPLCONF 命令告知主服务器自己监听的端口(目前这一步仅仅是为了告知 Master 自己的监听端口信息,方面主服务器在 INFO 命令中展示,没有实际功能作用),replication.c:1414

    // 进行身份验证
    if(server.masterauth) {
        err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
        //...
    }
    
    {
        sds port = sdsfromlonglong(server.port);
        // 发送命令 REPLCONF listening-port $port
        err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
                                         NULL);
        //...
    }

最后通过给主服务器发 PSYNC 命令来判断是否需要全量同步,PSYNC 是从服务器用来向主服务器申请同步的命令,在这个命令执行之前,主服务器都只是把它当成普通的客户端,对于刚刚启动的从服务器,发送的命令只可能是 PSYNC ? -1,表示强制进行全量同步(其实就是传 rdb 文件),PSYNC 命令的完整形式是:

replication.c:1447

    // 根据 PSYNC 返回的结果决定是执行部分 resync ,还是 full-resync
    // 主服务器有三种可能性: 1. 同意部分同步 2. 全量同步 3. 旧版本的 Redis,压根不支持 PSYNC 命令
    psync_result = slaveTryPartialResynchronization(fd);

这里 psync_result 有三种可能返回的值:

#define PSYNC_CONTINUE 0   // 主服务器同意部分同步
#define PSYNC_FULLRESYNC 1 // 主服务器要求全量同步
#define PSYNC_NOT_SUPPORTED 2  // 旧版本的 Redis,压根不支持 PSYNC 命令

根据不同的返回值,这里会发生不同的状态转移,对于发送 PSYNC ? -1 的从服务器,只可能返回 PSYNC_FULLRESYNC,然后就会发生 RECEIVE_PONG -> TRANSFER 状态的转移。另一种可能的状态转移待会再聊。

RECEIVE_PONG -> TRANSFER

创建一个名称为 temp-${time}-${pid} 的临时文件用于存放主服务器发送来的 rdb 文件,然后注册一个用于接收 rdb 文件的事件处理器,最后进入 REDIS_REPL_TRANSFER 状态,replication.c:1478

    // 打开一个临时文件,用于写入和保存接下来从主服务器传来的 RDB 文件数据
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        // O_EXCL 表示若文件已存在 则打开出错
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        //...
    }
    //...
    // 注册事件处理器 readSyncBulkPayload
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        //...
    }
    //...
    // 状态转换
    server.repl_state = REDIS_REPL_TRANSFER;

TRANSFER -> CONNECTED

打开刚刚注册的事件处理器 readSyncBulkPayload 函数的代码,我们发现主服务器会在第一行传一个数字,表示要传输的 rdb 文件的大小,然后就如实地传递 rdb 文件的内容了,然后从服务器也如实地将其写进自己刚刚创建的临时文件中:

    // 每次读 4 KB 的一个 Bulk
    char buf[4096];
    // Master 第一行传文件大小
    // 读取 RDB 文件的大小
    if (server.repl_transfer_size == -1) {
        // 同步读取一行
        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
            //...
        }
        //...
        // repl_transfer_size 存储 RDB 文件的大小
        server.repl_transfer_size = strtol(buf+1,NULL,10);
    }

    //...
    // 读取一个 Bulk 的 RDB 文件内容 
    nread = read(fd,buf,readlen);
    // 写入之前创建的临时文件
    if (write(server.repl_transfer_fd,buf,nread) != nread) {
        //...
    }
    // 累计读取的字节数
    server.repl_transfer_read += nread;

如果发现读取完毕(其实就是发现累计读取的字节数 server.repl_transfer_read 和 rdb 文件的大小 server.repl_transfer_size 相等),则把刚刚写的临时 rdb 文件加载进内存,给 Master 绑定最普通的客户端事件处理器(这里所谓的“最普通”的事件处理器就是指专门负责处理 Redis 命令的事件处理器,直接把到 Master 当成了普通的命令客户端,因为之后 Master 会通过这条连接将自己执行的命令原封不动地转一份给 Slave),最后状态转换为 REDIS_REPL_CONNECTEDreplication.c:1036

    // 检查 RDB 是否已经传送完毕
    if (server.repl_transfer_read == server.repl_transfer_size) {
        // 载入 RDB
        if (rdbLoad(server.rdb_filename) != REDIS_OK) {
            //...
        }
        //...
        // 给 Master 绑定"最普通"的事件处理器
        server.master = createClient(server.repl_transfer_s);
        
        //...
        // 状态转换
        server.repl_state = REDIS_REPL_CONNECTED;
    }

之后 Slave 只需要把 Master 当成一个普通的客户端,等它源源不断地转发命令过来就行了。

以为到这里就结束了?这是不可能的,因为网络可能出现问题,比如到 Master 的连接超时等,可能会导致新的状态转移。

CONNECT <- CONNECTED

这个状态转移其实是因为与 Master 的连接超时,如果 60s 之内没有接收到 Master 的任何消息(包括心跳和复制的命令),就会导致状态从 REDIS_REPL_CONNECTED 退化到 REDIS_REPL_CONNECTING(所以我把箭头反过来了),这段逻辑可以从之前提到的 replicationCron 函数 中找到,replication.c:2215

    // 从服务器曾经连接上主服务器,但现在超时
    if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
        // 释放主服务器
        freeClient(server.master);
    }

并且在 freeClient -> replicationCacheMaster -> replicationHandleMasterDisconnection 中将状态置为 REDIS_REPL_CONNECT,networking.c:867

// 这个函数在从服务器和主服务器失去联系时调用
void replicationHandleMasterDisconnection(void) {
    //...
    server.repl_state = REDIS_REPL_CONNECT;
    //...
}

RECEIVE_PONG -> CONNECTED

Slave 在和 Master 恢复连接之后,会重新走一遍从 REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG,但是有机会跳过 REDIS_REPL_TRANSFER,直接转移到 REDIS_REPL_CONNECTED,靠得就是之前提到的 PSYNC 命令,如果 Master 响应了 +CONTINUE,那么就可以直接跳到 REDIS_REPL_CONNECTED 了,replication.c:1288

    // 接收到 CONTINUE ,进行 partial resync
    if (!strncmp(reply,"+CONTINUE",9)) {
        //...
        // 在该函数中会将状态转换为 REDIS_REPL_CONNECTED
        replicationResurrectCachedMaster(fd);

        //...
        return PSYNC_CONTINUE;
    }

Master 会在什么情况下响应 +CONTINUE 呢?之后我们会在 Master 视角下详细解读。

Master 视角


上面我们看到,Slave 在复制中是比较主动的,通过一个状态机不断驱动着自己行动,而是 Master 则是一个比较被动的角色,在全量复制时,主要是提供一些命令(比如 PSYNC)给 Slave 调用,在命令传播时,则主要由 Master 的客户端驱动着 Master 进行命令传播,客户端每执行一条更新数据的命令,Master 就转发一份给所有的 slave(注意,这个转发是异步的,所以 Redis 的主从是很有可能数据不一致的)。

命令传播

先研究命令传播有利于后面我们研究 PSYNC 的实现,我们这里先研究命令传播。

在第一次大规模的全量复制结束后,之后每次 Master 接收到更新数据的命令,都会传播一份给 Slave,对于 Slave 来说,Master 就像一个普通的客户端一样。

在该系列的 [第八篇文章 RDB 与 AOF] (http://dqyuan.top/2020/01/03/redis-read-8.html#%E5%91%BD%E4%BB%A4%E7%9A%84%E5%8F%8A%E6%97%B6%E5%AD%98%E5%82%A8) 中我提到过,Redis 每当有更新数据库的命令执行时,就会调用一个叫做 propagate 的函数,在这个函数中,它会将命令传播到 AOF 和 所有的 Slave:

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // 传播到 AOF
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);

    // 传播到 slave
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

负责传播到 slave 的主要就是这个 replicationFeedSlaves 函数了,这个函数主要做两件事,一个是将命令写入 Master 服务器的 backlog 中,另一个是异步转发给所有的 Slave:

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    // 如果 Slave 的数据库都和 Master 不同的话,先生成一个 select db 的命令
    if (server.slaveseldb != dictid) {
        //...
        // 将 SELECT 命令添加到 backlog
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
        // 发送给所有从服务器
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            // 异步发送给从服务器
        }
    }
    
    // 和上面的 SELECT 命令的处理是一样的,也是先添加到 backlog,然后异步发送给所有从服务器
    if (server.repl_backlog) {
        //...
        // 添加到 backlog
        feedReplicationBacklog(aux,len+3);
        //...
    }
    // 异步发送给所有从服务器
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        // 异步发送给从服务器
    }
}

那么这个 backlog 是什么东西呢?

backlog

backlog 是一个默认 1MB 大小的环形缓冲区,用于记录传播给 Slave 的命令,如果 Slave 中间和 Master 失联,恢复连接后,可以利用 backlog 中记录的命令序列进行增量同步。

PSYNC 命令执行的结尾会进行 backlog 的初始化,replication.c:49

// 创建 backlog
void createReplicationBacklog(void) {

    redisAssert(server.repl_backlog == NULL);

    // backlog本体  默认 1 MB
    server.repl_backlog = zmalloc(server.repl_backlog_size);
    
    //...
}

feedReplicationBacklog 函数 中我们可以看到是如何往这个环形缓冲区中写数据的,这个就不仔细多这个代码,直接给几个结论:

PSYNC 增量传输

它首先要判断是否可以增量同步数据,replication.c:560

    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 尝试进行增量传输
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            //...
             return;
        }

回忆一下该命令的完整格式 PSYNC <master_run_id> <offset>,Master 校验的就是 <master_run_id> 和自己的 run id 是否一致,以及 <offset> 是否在 backlog 可以恢复的范围内,如果这两个条件都符合,就同意增量传输,所谓增量传输,其实就是给 Slave 返回一个 +CONTINUE,并且异步传输 backlog 中记录的在 offset 以来 Slave 丢失的数据,replication.c:410

int masterTryPartialResynchronization(redisClient *c) {
    // 检查 run id
    if (strcasecmp(master_runid, server.runid)) {
        // run id 检查不通过
        //...
        goto need_full_resync;
    }
    // 检查是否在 backlog 可以恢复的范围
    // offset 必须满足
    //  server.repl_backlog_off <= offset  <= server.repl_backlog_off + server.repl_backlog_histlen
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        // backlog 无力回复
        //...
        goto need_full_resync;
    }
    
    // 到此,检查全部完成,说明可以恢复 backlog
    //...
    // Master 视角设置该 Slave 的状态为 REDIS_REPL_ONLINE
    c->replstate = REDIS_REPL_ONLINE;
    // 将该客户端加入 server.slaves
    listAddNodeTail(server.slaves,c);
    // 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        //...
    }
    //...
    // 异步发送 backlog 中 Slave 缺失的增量内容
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    //...
}

如果可以增量传输的话,那么在上面的 masterTryPartialResynchronization 函数返回之后,PSYNC 命令就立即返回了,如果不可以增量传输,才会执行接下来的全量传输逻辑。

PSYNC 全量传输

全量传输中,从 Master 的视角,Slave 也会经历几个状态的转换,但是相对比较简单。

Master 视角的 Slave 状态转移

需要注意的是,在 Master 的 server.slaves 中的每个 slave 都有这么一个状态机。

而且这张图相比 Slave 视角的也没有状态的回退,因为不需要,发现有 Slave 连接超时,直接将其从链表移除就好了,等待 Slave 主动重新建立连接。

前面增量传输的代码已经展示了,如果确认可以增量同步的话就会直接从 REDIS_REPL_NONE 状态跳到 REDIS_REPL_ONLINE 状态,下面来仔细研究全量同步情况小几个状态。

我们可以先思考一下什么情况下才可以发 rdb 文件给 Slave,发送旧的 dump.rdb 是肯定不可以的,因为从旧的 rdb 镜像到目前位置数据库到底经历过哪些更新我是不知道的,所以我只能在 slave 要全量同步时,写一个新的 rdb 传过去,这里你可能又要疑惑了,从开始写新的 rdb 到传输完毕(需要回忆一下,rdb 重写的依据是子进程 fork 的一瞬间的数据库镜像),这期间数据库又可能更新,我又要怎么知道这期间更新了什么呢?这个其实不必要担心,因为在启动了子进程之后,代表 Slave 的 redisClient 就被添加到了 server.slaves中,之后数据库再有更新的话,就会被立即传播到 Slave 的 buf 中,其实这个 buf 就是 redisClient 的异步发送 buf,要等到 Slave 处于 REDIS_REPL_ONLINE 状态后,buf 中的数据才会真正被发送。这么看来数据就不会丢失了。简而言之:需要新写一个 rdb 文件传输给 Slave

如果此时在 Slave 申请复制时,后台已经有一个子进程在重写 rdb 了呢?最简单的方法就是等它结束(置为 REDIS_REPL_WAIT_BGSAVE_START 状态),再重启一个(状态转换为 REDIS_REPL_WAIT_BGSAVE_END)。这个想法还是有待优化的余地的,如果此时刚好有另一个 Slave 处于 REDIS_REPL_WAIT_BGSAVE_END 状态(记这个 Slave 为 s1),那么说明在 s1 的 buf 中记录了从这个正在写的 rdb 到目前位置的所有更新命令,那我们直接把 s1 的 buf 拷贝一份给自己就可以上这一趟 rdb 的车了,也就是说我可以跳过 REDIS_REPL_WAIT_BGSAVE_START 状态,直接进入 REDIS_REPL_WAIT_BGSAVE_END 了。简而言之:只要有个一个 Slave 正在新写 rdb,后面来的 Slave 都可以上这趟车

代码比较复杂,就不详细分析了,理解了上面思路就很容易读懂。

心跳与超时


主到从的心跳

默认每 10s 一次。

对于处于 REDIS_REPL_WAIT_BGSAVE_START 或者 REDIS_REPL_WAIT_BGSAVE_END 状态的 Slave,发送 \n 表示心跳。

对于处于 REDIS_REPL_ONLINE 状态的 Slave,则发送 PING 命令表示心跳。

之所以会有这两种情况,从上面的源码分析中可以看出,在两种状态下, Slave 给主服务器注册的事件处理器是不同的。

replication.c:2254

    if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
        //...
        // 向所有已连接 slave (状态为 ONLINE)发送 PING
        ping_argv[0] = createStringObject("PING",4);
        replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
        //...
        // 向 WAIT_BGSAVE_START 和 REDIS_REPL_WAIT_BGSAVE_END 的 slave 发 \n
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;

            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
                slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
                if (write(slave->fd, "\n", 1) == -1) {
                    /* Don't worry, it's just a ping. */
                }
            }
        }
    }

从到主的心跳

默认 1s 一次的 REPLCONF ACK <offset> 命令。从服务器通过这个 <offset> 告诉主服务器自己当前的复制偏移,如果在 Master 上使用 INFO 命令将能够看到这个信息。

replication.c:2241:

    if (server.masterhost && server.master &&
        !(server.master->flags & REDIS_PRE_PSYNC))
        replicationSendAck();

常见疑问与解答


主从服务器会出现数据不一致吗?

显然是会的。因为主服务器是异步将命令传播给的从服务器,Redis 也因此获得了超高的性能,我以前简单做过简单的测试,发现 Redis 开启主从和不开启主从,性能基本是一样的。如果你对数据的一致性有很强烈的需求,可以考虑使用 wait 命令,执行 wait n m 命令客户端只有在自己最近执行的一条数据库更新命令被至少 n 个 Slave 复制后(或者 m 秒后超时),才能从该命令返回。

从服务器保证事务吗?

这个问题更加详细的描述时:如果我在主服务器上使用 Redis 的事务(也就是 multiexecwatch等命令),从服务器上有可能看到事务执行一半的状态吗?

答案是保证事务,在从服务器上不会看到事务执行一半的状态。仔细研究源码会发现,被包裹在事务中的命令,只有在 Redis 执行 exec 命令时执行事务中第一条写命令时才会传播一个 multi 命令过去,然后传播所有的写命令,最后将自己(也就是 exec 命令)传播过去,从而保证事务。

multi.c:241

        if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {

            // 传播 MULTI 命令
            execCommandPropagateMulti(c);

            // 计数器,只发送一次 multi 命令
            must_propagate = 1;
        }