如何使用 Redis Pub/Sub 实现即时消息通知功能?

文章导读
Redis Pub/Sub 适合做实时消息通知,但要注意它不持久化消息,客户端断开期间的消息会丢失,适合对实时性要求高但允许短暂数据丢失的场景。
📋 目录
  1. 命令速用版
  2. 核心原理与版本差异
  3. 生产环境代码实现
  4. 怎么验证是否生效
  5. 常见坑与排查
  6. 参考来源
A A

Redis Pub/Sub 适合做实时消息通知,但要注意它不持久化消息,客户端断开期间的消息会丢失,适合对实时性要求高但允许短暂数据丢失的场景。

先说结论:Redis Pub/Sub 能快速搭建即时通知系统,但要知道它不是消息队列,离线消息不会保留。

  • 适合:实时聊天、在线状态推送、系统通知等允许少量消息丢失的场景
  • 先看:订阅者需要保持长连接,网络中断会导致订阅失效
  • 建议:如需消息持久化,结合 Redis Stream 或外部队列使用

命令速用版

打开两个终端窗口快速测试(生产环境建议开启认证):

订阅者终端:

redis-cli -a your_password SUBSCRIBE news

发布者终端:

redis-cli -a your_password PUBLISH news "Hello Redis Pub/Sub!"

订阅者会立即收到消息,格式为:

1) "message"
2) "news"
3) "Hello Redis Pub/Sub!"

查看订阅状态的命令:

PUBSUB CHANNELS          # 查看所有活跃频道
PUBSUB NUMSUB news       # 查看指定频道的订阅者数量
PUBSUB NUMPAT            # 查看模式订阅数量

核心原理与版本差异

Redis Pub/Sub 的核心是频道(Channel)机制。发布者通过 PUBLISH 命令向指定频道发送消息,订阅者通过 SUBSCRIBE 命令监听频道。消息以广播形式传递,所有订阅同一频道的客户端会即时收到数据。

如何使用 Redis Pub/Sub 实现即时消息通知功能?

Redis 内部使用 pubsub_channels 字典存储频道与订阅者的映射关系,键是频道名,值是订阅该频道的客户端列表。发布消息时,Redis 在 O(1) 时间内找到频道,然后遍历订阅者列表发送消息。

注意集群兼容性:

  • Redis 7.0 之前:集群模式下 Pub/Sub 消息仅在当前节点广播,不跨节点同步。若发布者和订阅者连接不同节点,消息会丢失。
  • Redis 7.0+:引入了 Sharded Pub/Sub(分片发布订阅),支持跨节点消息分发,命令变更为 SPUBLISH/SSUBSCRIBE。

生产环境代码实现

第一步:安装 Redis 和客户端

确保 Redis 服务已启动,安装对应语言的客户端库。Python 示例:

pip install redis

第二步:配置 Redis 安全认证

在 redis.conf 中开启密码认证,防止未授权访问:

requirepass your_strong_password
protected-mode yes

第三步:编写发布者代码(含异常处理)

如何使用 Redis Pub/Sub 实现即时消息通知功能?
import redis
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

r = redis.Redis(
    host='localhost', 
    port=6379, 
    db=0,
    password='your_password',  # 生产环境务必配置密码
    decode_responses=True
)

def send_notification(user_id, message):
    try:
        channel = f"notifications.user.{user_id}"
        r.publish(channel, message)
        logger.info(f"消息已发布到 {channel}")
        return True
    except redis.exceptions.RedisError as e:
        logger.error(f"发布失败:{e}")
        return False

第四步:编写订阅者代码(含断线重连)

订阅者需要保持长连接运行,网络中断会导致订阅失效,必须实现重连逻辑:

import redis
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def subscribe_with_retry():
    while True:
        try:
            r = redis.Redis(
                host='localhost', 
                port=6379, 
                password='your_password',
                decode_responses=True,
                socket_timeout=5,
                socket_connect_timeout=5
            )
            pubsub = r.pubsub()
            pubsub.subscribe('chat_channel')
            
            logger.info("订阅成功,等待消息...")
            
            for message in pubsub.listen():
                if message['type'] == 'message':
                    logger.info(f"收到消息:{message['data']}")
                    # 在此处处理业务逻辑
                    
        except redis.exceptions.ConnectionError as e:
            logger.error(f"连接断开:{e},5 秒后重试...")
            time.sleep(5)
        except Exception as e:
            logger.error(f"发生未知错误:{e}")
            time.sleep(5)

if __name__ == "__main__":
    subscribe_with_retry()

第五步:可靠性增强(结合 Redis Stream)

如果需要消息持久化,可同时写入 Stream 和 Pub/Sub:

def send_reliable_notification(user_id, message):
    stream_key = f"stream:user:{user_id}"
    channel = f"notifications.user.{user_id}"
    try:
        # 1. 写入 Stream 持久化
        r.xadd(stream_key, {"msg": message}, maxlen=1000)
        # 2. 发布 Pub/Sub 实时通知
        r.publish(channel, message)
        return True
    except Exception as e:
        logger.error(f"发送失败:{e}")
        return False

怎么验证是否生效

检查订阅状态:

PUBSUB CHANNELS          # 确认频道已创建
PUBSUB NUMSUB news       # 确认有订阅者

监控连接状态:

如何使用 Redis Pub/Sub 实现即时消息通知功能?
INFO clients             # 查看当前客户端连接数
MONITOR                  # 实时查看命令执行(测试环境使用)

测试消息传递:

在订阅者运行时,从另一个终端发布消息,观察订阅者是否立即收到。消息延迟应在毫秒级别。

常见坑与排查

1. 消息丢失问题:Pub/Sub 模型默认不持久化消息,若客户端断开连接,期间的消息会丢失。这一设计以可靠性为代价换取极致性能,适合允许短暂数据丢失的场景。如需持久化,需结合 Redis Stream 或外部队列实现。

2. 连接管理:Redis Pub/Sub 采用 TCP 长连接保持实时通信,网络中断会导致订阅失效。生产环境必须实现自动重连机制(见上方代码示例),并在断开时记录日志。

3. 集群扩展限制:在 Redis 7.0 之前,集群模式下 Pub/Sub 消息不跨节点广播。若使用集群,需确保发布者和订阅者路由到同一节点,或升级至 7.0+ 使用 Sharded Pub/Sub。

4. 高频通知优化:高频通知场景建议使用连接池,消息体尽量精简(如只传 ID 而非完整数据),减少网络传输开销。

5. 安全配置:生产环境务必设置 requirepass 并绑定内网 IP,避免 Pub/Sub 接口被外部扫描利用。

参考来源