引言
Redis的事务实现非常简单,本质上就是将命令打包到队列中,到时候一起执行。
进入事务
客户端使用 multi
命令进入事务。代表客户端的 redisClient
结构体有一个 flags
字段表示客户端状态,而 multi
命令本质上就是将 flags
上表示事务状态的标志位置位,表示客户端处于事务状态。multi.c:124:
void multiCommand(redisClient *c) {
// 不能在事务中嵌套事务
if (c->flags & REDIS_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
// 打开事务 FLAG
c->flags |= REDIS_MULTI;
addReply(c,shared.ok);
}
在事务中执行命令
当在事务状态下执行除了exec
, discard
, multi
和watch
以外的命令时,将其加入到命令队列中。redis.c:2755:
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 在事务上下文中
// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外
// 其他所有命令都会被入队到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 正常执行命令
}
在 redisClient
结构体中,有一个 mstate
字段表示事务状态,它是 multiState
结构体对象。redis.h:501:
/*
* 事务状态
*/
typedef struct multiState {
// 事务队列
multiCmd *commands; /* Array of MULTI commands */
// 已入队命令计数
int count; /* Total number of MULTI commands */
} multiState;
在事务状态下执行的命令都会被放到 mstate
中 commands
队列的尾部,并把 count
计数加1。执行的时候会遍历 commands
队列,从头到尾执行所有命令。
exec
很容易可以找到负责执行 exec
命令的函数execCommand,下面对该函数进行拆解
其流程总结如下:
- 如果客户端没有处于“事务状态”,则返回一个错误,因为
exec
只允许在事务状态下执行
// 客户端没有处于事务状态
if (!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
- 如果客户端(
c->flags
)的REDIS_DIRTY_CAS
标志位或者REDIS_DIRTY_EXEC
标志位被置位,则取消事务,这两个标志位的含义分别如下:REDIS_DIRTY_CAS
:客户端 watch 的键被修改REDIS_DIRTY_EXEC
:事务中间有错误的命令
if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
// 取消事务
discardTransaction(c);
goto handle_monitor;
}
- 取消所有 watch 的键
/* Exec all the queued commands */
// 已经可以保证安全性了,取消客户端对所有键的监视
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
- 遍历事务产生的命令队列
c->mstate.commands
,从头到尾执行所有命令
// 执行事务中的命令
for (j = 0; j < c->mstate.count; j++) {
//...
}
watch
Redis 允许我们在事务开始之前,watch
一个或多个键,当事务执行过程中这些键被修改时,exec
直接失败,并返回一个nil
。
先来看看Redis是怎么保存被 watch
的 key 的:
redisClient
有一个watched_keys
列表字段,用来保存该客户端 watch 的所有 key,该数据的主要用处是去重,防止一个客户端多次watch
同一个键,redis.h:664
typedef struct redisClient {
//....
// 被监视的键
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
//...
}
redisDb
也有一个字典类型的watched_keys
字段,该字典的 key 是被监听的键,value是一个 list 类型,表示所有监听该键的客户端,该数据主要用处是当某个键被修改时,可以通过这个字典立即拿到所有键听该键的客户端,将它们的REDIS_DIRTY_CAS
全部置位。redis.h:468
typedef struct redisDb {
//....
// 正在被 WATCH 命令监视的键
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
//...
}
当使用客户端 watch
一个键时,本质上就是先把代表自己的redisClient
以键为字典的key添加到 redisDb
的 watched_keys
字典中,然后在把将听键添加到redisClient
的watched_keys
列表中,multi.c: 323:
void watchForKey(redisClient *c, robj *key) {
// 1. 去重
// 检查 key 是否已经保存在 watched_keys 链表中,
// 如果是的话,直接返回
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
// 2. 添加 redisClient 到 redisDb 中
clients = dictFetchValue(c->db->watched_keys,key);
// 如果不存在的话,添加它
if (!clients) {
// 值为链表
clients = listCreate();
// 关联键值对到字典
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
// 将客户端添加到链表的末尾
listAddNodeTail(clients,c);
// 3. 添加监听的键到 reidsClient 中
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
通知键修改
前面已经提到了 Redis 是如何通知键修改的了,现在我们具体去代码里翻一翻。凡是对键进行修改的命令理论上都会进行通知,我们这里就以 set
命令为例,我们翻一翻:setCommand
-> setGenericCommand
-> setKey
-> signalModifiedKey
,最后在 touchWatchedKey
中找到了相关的实现:
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail.
*
* “触碰”一个键,如果这个键正在被某个/某些客户端监视着,
* 那么这个/这些客户端在执行 EXEC 时事务将失败。
*/
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
// 字典为空,没有任何键被监视
if (dictSize(db->watched_keys) == 0) return;
// 获取所有监视这个键的客户端
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
// 遍历所有客户端,打开他们的 REDIS_DIRTY_CAS 标识
listRewind(clients,&li);
while((ln = listNext(&li))) {
redisClient *c = listNodeValue(ln);
c->flags |= REDIS_DIRTY_CAS;
}
}
它通过字典获得所有监听该键的客户端,并将他们的 REDIS_DIRTY_CAS
置位。
再看一下它的上一层函数signalModifiedKey
,上面有注释,大意是”这个一个键空间改动钩子,每当数据库中的键数据被改动时,都会被调用”,db.c:407:
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* 键空间改动的钩子。
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* 每当数据库中的键被改动时, signalModifiedKey() 函数都会被调用。
*
* Every time a DB is flushed the function signalFlushDb() is called.
*
* 每当一个数据库被清空时, signalFlushDb() 都会被调用。
*----------------------------------------------------------------------------*/
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
}
如此一来,所有对键数据有改动的命令都会对监听键的客户端进行通知。
中间命令错误
前面提到过当事务状态下输入错误命令时,会导致客户端的REDIS_DIRTY_EXEC
被置位,从而导致事务失败。
这个操作在检查命令是否存在已经参数是否正确时进行, redis.c:2551:
// 查找命令,并进行命令合法性检查,以及命令参数个数检查
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 没找到指定的命令
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
// 负数表示参数个数要大于等于它的相反数
(c->argc < -c->cmd->arity)) {
// 参数个数错误
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
REDIS_DIRTY_EXEC
的置位操作在flagTransaction
函数中进行:
void flagTransaction(redisClient *c) {
if (c->flags & REDIS_MULTI)
c->flags |= REDIS_DIRTY_EXEC;
}
由此可以看出,只有在命令不存在或者参数个数不存在时,才会导致事务整体失败。如果是运行时错误(比如,键的类型不对,对 string 类型的键调用了操作 hash 键的命令),那么 Redis 是无法立即发现的,到时候 exec
命令执行时,这些命令依旧会被顺序执行,这一条命令的失败对整个事务不会产生任何影响。