引言


发布订阅看上去比较神奇,但是实现其实非常简单。

订阅


Redis中支持两种订阅,一种是订阅一个单独的频道(subscribe命令),另外一种是用类似正则的表达式订阅一组频道(psubscribe命令)。

订阅单个频道

# 订阅msg频道
subscribe msg

这个命令做的事情其实非常简单,就是先往redisClient结构体的pubsub_channels字典里加入订阅的channel的名字(比如上面的命令,会以msg为key,NULL为value),然后往redisServer结构体的pubsub_channels字典里加入订阅该channel的redisClient(以channel名为key,redisClient链表为value),pubsub.c: 163

int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    // 将channel加入redisClient的pubsub_channels字典中
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            //...
            // 将channel加入redisServer的pubsub_channels中
             dictAdd(server.pubsub_channels,channel,clients);
            //...
        }
    }
    
    // 回复客户端
    //....
}

redisClientpubsub_channels字段是一个value都为NULL的字典,本质是一个集合,这个字段主要作用就是去重,保证客户端即使多次订阅同一个channel,也只添加一次。redis.h:669

typedef struct redisClient {
    //...
    dict *pubsub_channels;
    //...
}

redisServer中的pubsub_channels作用则比较大,它也是一个字典,key是channel的命令,value则是所有订阅该channel的redisClient链表,之后在publish的实现中我们可以看到,publish其实就是在遍历这个链表,然后往链表里的每一个client发消息。redis.h:1263

struct redisServer {
    //...
    dict *pubsub_channels;
    //...
}

订阅一组频道

# 发布消息到 news.it 或者 news.ee 等等 该客户端都能收到
psubscribe news.*

流程和subscribe差不多,都是把channel加到redisClient中,然后把自己(redisClient)加入到redisServer中,只是加的字段不一样,这次是分别加到pubsub_patterns中,pubsub.c:163

int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    // 确保没有和已经添加的正则表达式重复
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        pubsubPattern *pat;
        // 将新的正则加入到pubsub_patterns链表的尾部
        listAddNodeTail(c->pubsub_patterns,pattern);
        //...
        // 同样添加到redisServer的pubsub_patterns链表的尾部
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    
    //回复客户端
    //...
}

这里redisClientpubsub_patterns的作用虽然也是去重,但是确是用链表实现的,可能它的开发者们默认不会有太多的正则订阅(正常情况下一个也就够用了)。

从上面的代码可以看出,redisServerpubsub_patterns字段虽然也是链表,但是确实pubsubPattern链表,pubsubPattern结构体如下,redis.h:1326

typedef struct pubsubPattern {

    // 订阅的客户端
    redisClient *client;

    // 订阅的正则表达式
    robj *pattern;

} pubsubPattern;

里面有订阅的客户端和订阅的正则表达式,在publish的时候,会遍历这个链表,用正则表达式一个个进行匹配,匹配上了就发现消息给相应的client

发布


publish msg "good morning"

上面将订阅的时候已经顺带着讲了,publish其实就是两步:

  1. 先去redisServerpubsub_channels字典中以发布的channel名为key,取出所有订阅该channel的redisClient链表,遍历该链表,给每个client发一条消息
  2. 然后遍历redisServerpubsub_patterns链表,用里面的每一个元素(上面介绍的pubsubPattern结构体)的pattern字段(正则表达式)匹配一下发布的channel名(比如上面的msg),匹配上了就给该client发布一条消息

代码在pubsub.c: 391

int pubsubPublishMessage(robj *channel, robj *message) {
    // 第一步
    de = dictFind(server.pubsub_channels,channel);
    listRewind(list,&li);
    while ((ln = listNext(&li)) != NULL) {// 遍历
        // 给客户端发消息
        //...
    }
    // 第二步
    if (listLength(server.pubsub_patterns)) { // 存在正则表达式订阅
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {// 遍历
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) { // 匹配成功
                // 给客户端发消息
            }
        }
    }
    
    //...
}

Redis中这个正则其实不是标准的正则表达式语法,而是自己实现了部分的正则语法,支持*?[],以及支持使用\转义,如果想看其实现可以去stringmatchlen

int stringmatchlen(const char *pattern, int patternLen,
        const char *string, int stringLen, int nocase)
{
    while(patternLen) {
        switch(pattern[0]) {
        case '*':
            //...
            break;
        case '?':
            //...
            break;
        case '[':
            //...
            break;
        case '\\':
            //...
        default:
            //...
            break;
        }
        //....
    }
}