Redis Pub/Sub 适合做实时消息通知,但要注意它不持久化消息,客户端断开期间的消息会丢失,适合对实时性要求高但允许短暂数据丢失的场景。
先说结论:Redis Pub/Sub 能快速搭建即时通知系统,但要知道它不是消息队列,离线消息不会保留。
- 适合:实时聊天、在线状态推送、系统通知等允许少量消息丢失的场景
- 先看:订阅者需要保持长连接,网络中断会导致订阅失效
- 建议:如需消息持久化,结合 Redis Stream 或外部队列使用
命令速用版
打开两个终端窗口快速测试(生产环境建议开启认证):
订阅者终端:
redis-cli -a your_password SUBSCRIBE news
发布者终端:
redis-cli -a your_password PUBLISH news "Hello Redis Pub/Sub!"
订阅者会立即收到消息,格式为:
1) "message" 2) "news" 3) "Hello Redis Pub/Sub!"
查看订阅状态的命令:
PUBSUB CHANNELS # 查看所有活跃频道 PUBSUB NUMSUB news # 查看指定频道的订阅者数量 PUBSUB NUMPAT # 查看模式订阅数量
核心原理与版本差异
Redis Pub/Sub 的核心是频道(Channel)机制。发布者通过 PUBLISH 命令向指定频道发送消息,订阅者通过 SUBSCRIBE 命令监听频道。消息以广播形式传递,所有订阅同一频道的客户端会即时收到数据。
Redis 内部使用 pubsub_channels 字典存储频道与订阅者的映射关系,键是频道名,值是订阅该频道的客户端列表。发布消息时,Redis 在 O(1) 时间内找到频道,然后遍历订阅者列表发送消息。
注意集群兼容性:
- Redis 7.0 之前:集群模式下 Pub/Sub 消息仅在当前节点广播,不跨节点同步。若发布者和订阅者连接不同节点,消息会丢失。
- Redis 7.0+:引入了 Sharded Pub/Sub(分片发布订阅),支持跨节点消息分发,命令变更为 SPUBLISH/SSUBSCRIBE。
生产环境代码实现
第一步:安装 Redis 和客户端
确保 Redis 服务已启动,安装对应语言的客户端库。Python 示例:
pip install redis
第二步:配置 Redis 安全认证
在 redis.conf 中开启密码认证,防止未授权访问:
requirepass your_strong_password protected-mode yes
第三步:编写发布者代码(含异常处理)
import redis
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
r = redis.Redis(
host='localhost',
port=6379,
db=0,
password='your_password', # 生产环境务必配置密码
decode_responses=True
)
def send_notification(user_id, message):
try:
channel = f"notifications.user.{user_id}"
r.publish(channel, message)
logger.info(f"消息已发布到 {channel}")
return True
except redis.exceptions.RedisError as e:
logger.error(f"发布失败:{e}")
return False第四步:编写订阅者代码(含断线重连)
订阅者需要保持长连接运行,网络中断会导致订阅失效,必须实现重连逻辑:
import redis
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def subscribe_with_retry():
while True:
try:
r = redis.Redis(
host='localhost',
port=6379,
password='your_password',
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5
)
pubsub = r.pubsub()
pubsub.subscribe('chat_channel')
logger.info("订阅成功,等待消息...")
for message in pubsub.listen():
if message['type'] == 'message':
logger.info(f"收到消息:{message['data']}")
# 在此处处理业务逻辑
except redis.exceptions.ConnectionError as e:
logger.error(f"连接断开:{e},5 秒后重试...")
time.sleep(5)
except Exception as e:
logger.error(f"发生未知错误:{e}")
time.sleep(5)
if __name__ == "__main__":
subscribe_with_retry()第五步:可靠性增强(结合 Redis Stream)
如果需要消息持久化,可同时写入 Stream 和 Pub/Sub:
def send_reliable_notification(user_id, message):
stream_key = f"stream:user:{user_id}"
channel = f"notifications.user.{user_id}"
try:
# 1. 写入 Stream 持久化
r.xadd(stream_key, {"msg": message}, maxlen=1000)
# 2. 发布 Pub/Sub 实时通知
r.publish(channel, message)
return True
except Exception as e:
logger.error(f"发送失败:{e}")
return False怎么验证是否生效
检查订阅状态:
PUBSUB CHANNELS # 确认频道已创建 PUBSUB NUMSUB news # 确认有订阅者
监控连接状态:
INFO clients # 查看当前客户端连接数 MONITOR # 实时查看命令执行(测试环境使用)
测试消息传递:
在订阅者运行时,从另一个终端发布消息,观察订阅者是否立即收到。消息延迟应在毫秒级别。
常见坑与排查
1. 消息丢失问题:Pub/Sub 模型默认不持久化消息,若客户端断开连接,期间的消息会丢失。这一设计以可靠性为代价换取极致性能,适合允许短暂数据丢失的场景。如需持久化,需结合 Redis Stream 或外部队列实现。
2. 连接管理:Redis Pub/Sub 采用 TCP 长连接保持实时通信,网络中断会导致订阅失效。生产环境必须实现自动重连机制(见上方代码示例),并在断开时记录日志。
3. 集群扩展限制:在 Redis 7.0 之前,集群模式下 Pub/Sub 消息不跨节点广播。若使用集群,需确保发布者和订阅者路由到同一节点,或升级至 7.0+ 使用 Sharded Pub/Sub。
4. 高频通知优化:高频通知场景建议使用连接池,消息体尽量精简(如只传 ID 而非完整数据),减少网络传输开销。
5. 安全配置:生产环境务必设置 requirepass 并绑定内网 IP,避免 Pub/Sub 接口被外部扫描利用。