引言
发布订阅看上去比较神奇,但是实现其实非常简单。
订阅
Redis中支持两种订阅,一种是订阅一个单独的频道(subscribe
命令),另外一种是用类似正则的表达式订阅一组频道(psubscribe
命令)。
订阅单个频道
# 订阅msg频道
subscribe msg
这个命令做的事情其实非常简单,就是先往redisClient
结构体的pubsub_channels
字典里加入订阅的channel的名字(比如上面的命令,会以msg
为key,NULL为value),然后往redisServer
结构体的pubsub_channels
字典里加入订阅该channel的redisClient
(以channel名为key,redisClient
链表为value),pubsub.c: 163:
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
// 将channel加入redisClient的pubsub_channels字典中
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
//...
// 将channel加入redisServer的pubsub_channels中
dictAdd(server.pubsub_channels,channel,clients);
//...
}
}
// 回复客户端
//....
}
redisClient
的pubsub_channels
字段是一个value都为NULL的字典,本质是一个集合,这个字段主要作用就是去重,保证客户端即使多次订阅同一个channel,也只添加一次。redis.h:669:
typedef struct redisClient {
//...
dict *pubsub_channels;
//...
}
而redisServer
中的pubsub_channels
作用则比较大,它也是一个字典,key是channel的命令,value则是所有订阅该channel的redisClient
链表,之后在publish
的实现中我们可以看到,publish
其实就是在遍历这个链表,然后往链表里的每一个client发消息。redis.h:1263:
struct redisServer {
//...
dict *pubsub_channels;
//...
}
订阅一组频道
# 发布消息到 news.it 或者 news.ee 等等 该客户端都能收到
psubscribe news.*
流程和subscribe
差不多,都是把channel加到redisClient中,然后把自己(redisClient
)加入到redisServer
中,只是加的字段不一样,这次是分别加到pubsub_patterns
中,pubsub.c:163:
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
// 确保没有和已经添加的正则表达式重复
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
pubsubPattern *pat;
// 将新的正则加入到pubsub_patterns链表的尾部
listAddNodeTail(c->pubsub_patterns,pattern);
//...
// 同样添加到redisServer的pubsub_patterns链表的尾部
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}
//回复客户端
//...
}
这里redisClient
的pubsub_patterns
的作用虽然也是去重,但是确是用链表实现的,可能它的开发者们默认不会有太多的正则订阅(正常情况下一个也就够用了)。
从上面的代码可以看出,redisServer
的pubsub_patterns
字段虽然也是链表,但是确实pubsubPattern
链表,pubsubPattern
结构体如下,redis.h:1326:
typedef struct pubsubPattern {
// 订阅的客户端
redisClient *client;
// 订阅的正则表达式
robj *pattern;
} pubsubPattern;
里面有订阅的客户端和订阅的正则表达式,在publish
的时候,会遍历这个链表,用正则表达式一个个进行匹配,匹配上了就发现消息给相应的client
。
发布
publish msg "good morning"
上面将订阅的时候已经顺带着讲了,publish
其实就是两步:
- 先去
redisServer
的pubsub_channels
字典中以发布的channel名为key,取出所有订阅该channel的redisClient
链表,遍历该链表,给每个client发一条消息 - 然后遍历
redisServer
的pubsub_patterns
链表,用里面的每一个元素(上面介绍的pubsubPattern
结构体)的pattern
字段(正则表达式)匹配一下发布的channel名(比如上面的msg
),匹配上了就给该client发布一条消息
代码在pubsub.c: 391:
int pubsubPublishMessage(robj *channel, robj *message) {
// 第一步
de = dictFind(server.pubsub_channels,channel);
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {// 遍历
// 给客户端发消息
//...
}
// 第二步
if (listLength(server.pubsub_patterns)) { // 存在正则表达式订阅
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {// 遍历
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) { // 匹配成功
// 给客户端发消息
}
}
}
//...
}
Redis中这个正则其实不是标准的正则表达式语法,而是自己实现了部分的正则语法,支持*
,?
,[]
,以及支持使用\
转义,如果想看其实现可以去stringmatchlen:
int stringmatchlen(const char *pattern, int patternLen,
const char *string, int stringLen, int nocase)
{
while(patternLen) {
switch(pattern[0]) {
case '*':
//...
break;
case '?':
//...
break;
case '[':
//...
break;
case '\\':
//...
default:
//...
break;
}
//....
}
}