引言


Redis的事务实现非常简单,本质上就是将命令打包到队列中,到时候一起执行。

进入事务


客户端使用 multi 命令进入事务。代表客户端的 redisClient 结构体有一个 flags 字段表示客户端状态,而 multi 命令本质上就是将 flags 上表示事务状态的标志位置位,表示客户端处于事务状态。multi.c:124

void multiCommand(redisClient *c) {

    // 不能在事务中嵌套事务
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }

    // 打开事务 FLAG
    c->flags |= REDIS_MULTI;

    addReply(c,shared.ok);
}

在事务中执行命令


当在事务状态下执行除了exec, discard, multiwatch以外的命令时,将其加入到命令队列中。redis.c:2755

    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        // 在事务上下文中
        // 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外
        // 其他所有命令都会被入队到事务队列中
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 正常执行命令
    }

redisClient 结构体中,有一个 mstate 字段表示事务状态,它是 multiState 结构体对象。redis.h:501

/*
 * 事务状态
 */
typedef struct multiState {

    // 事务队列
    multiCmd *commands;     /* Array of MULTI commands */

    // 已入队命令计数
    int count;              /* Total number of MULTI commands */
} multiState;

在事务状态下执行的命令都会被放到 mstatecommands 队列的尾部,并把 count 计数加1。执行的时候会遍历 commands 队列,从头到尾执行所有命令。

exec


很容易可以找到负责执行 exec 命令的函数execCommand,下面对该函数进行拆解

其流程总结如下:

    // 客户端没有处于事务状态
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {

        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);

        // 取消事务
        discardTransaction(c);

        goto handle_monitor;
    }
    /* Exec all the queued commands */
    // 已经可以保证安全性了,取消客户端对所有键的监视
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    // 执行事务中的命令
    for (j = 0; j < c->mstate.count; j++) {
        //...
    }

watch


Redis 允许我们在事务开始之前,watch一个或多个键,当事务执行过程中这些键被修改时,exec直接失败,并返回一个nil

先来看看Redis是怎么保存被 watch 的 key 的:

typedef struct redisClient {
    //....
    // 被监视的键
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    //...
}
typedef struct redisDb {
    //....
    // 正在被 WATCH 命令监视的键
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    //...
}

当使用客户端 watch 一个键时,本质上就是先把代表自己的redisClient以键为字典的key添加到 redisDbwatched_keys 字典中,然后在把将听键添加到redisClientwatched_keys列表中,multi.c: 323

void watchForKey(redisClient *c, robj *key) {
    // 1. 去重
    // 检查 key 是否已经保存在 watched_keys 链表中,
    // 如果是的话,直接返回
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    
    // 2. 添加 redisClient 到 redisDb 中
    clients = dictFetchValue(c->db->watched_keys,key);
    // 如果不存在的话,添加它
    if (!clients) { 
        // 值为链表
        clients = listCreate();
        // 关联键值对到字典
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 将客户端添加到链表的末尾
    listAddNodeTail(clients,c);
    
    // 3. 添加监听的键到 reidsClient 中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

通知键修改


前面已经提到了 Redis 是如何通知键修改的了,现在我们具体去代码里翻一翻。凡是对键进行修改的命令理论上都会进行通知,我们这里就以 set 命令为例,我们翻一翻:setCommand -> setGenericCommand -> setKey -> signalModifiedKey,最后在 touchWatchedKey 中找到了相关的实现:

/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. 
 *
 * “触碰”一个键,如果这个键正在被某个/某些客户端监视着,
 * 那么这个/这些客户端在执行 EXEC 时事务将失败。
 */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    // 字典为空,没有任何键被监视
    if (dictSize(db->watched_keys) == 0) return;

    // 获取所有监视这个键的客户端
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    // 遍历所有客户端,打开他们的 REDIS_DIRTY_CAS 标识
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);

        c->flags |= REDIS_DIRTY_CAS;
    }
}

它通过字典获得所有监听该键的客户端,并将他们的 REDIS_DIRTY_CAS 置位。

再看一下它的上一层函数signalModifiedKey,上面有注释,大意是”这个一个键空间改动钩子,每当数据库中的键数据被改动时,都会被调用”,db.c:407

/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * 键空间改动的钩子。
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * 每当数据库中的键被改动时, signalModifiedKey() 函数都会被调用。
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *
 * 每当一个数据库被清空时, signalFlushDb() 都会被调用。
 *----------------------------------------------------------------------------*/
void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

如此一来,所有对键数据有改动的命令都会对监听键的客户端进行通知。

中间命令错误


前面提到过当事务状态下输入错误命令时,会导致客户端的REDIS_DIRTY_EXEC被置位,从而导致事务失败。

这个操作在检查命令是否存在已经参数是否正确时进行, redis.c:2551

    // 查找命令,并进行命令合法性检查,以及命令参数个数检查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 没找到指定的命令
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return REDIS_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               // 负数表示参数个数要大于等于它的相反数
               (c->argc < -c->cmd->arity)) {
        // 参数个数错误
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }

REDIS_DIRTY_EXEC的置位操作在flagTransaction函数中进行:

void flagTransaction(redisClient *c) {
    if (c->flags & REDIS_MULTI)
        c->flags |= REDIS_DIRTY_EXEC;
}

由此可以看出,只有在命令不存在或者参数个数不存在时,才会导致事务整体失败。如果是运行时错误(比如,键的类型不对,对 string 类型的键调用了操作 hash 键的命令),那么 Redis 是无法立即发现的,到时候 exec 命令执行时,这些命令依旧会被顺序执行,这一条命令的失败对整个事务不会产生任何影响。