1. 增加消费者实例:最直接的方法是横向扩展消费者,通过部署多个消费者实例来并行处理消息积压。每个实例订阅相同的频道,Redis会自动将消息分发给所有订阅者,实现负载均衡。
方案一:消费者水平扩展
当消息积压严重时,首要优化是增加消费者数量。启动多个消费者进程,每个进程订阅相同的channel,Redis Pub/Sub 会将消息广播给所有订阅者,从而提高整体消费能力。代码示例:消费者脚本中使用 redis.subscribe('channel'),多进程运行即可。
方案二:使用Stream替代Pub/Sub
Redis Stream 支持消费者组(Consumer Group),类似于Kafka,能实现消息的有序消费和ACK确认,避免消息丢失和重复消费。使用 XREADGROUP 命令读取消息,XACK 确认已处理,积压消息可被其他消费者接管。
配置消费者组:XGROUP CREATE mystream mygroup $ MKSTREAM,然后消费者循环调用 XREADGROUP BLOCK 0 10 mygroup consumer1 >。
方案三:消息持久化和限流
开启 AOF 或 RDB 持久化,确保消息不丢失。同时,在发布端添加限流机制,避免瞬间消息洪峰导致积压。使用令牌桶算法控制发布速率,例如每秒最多100条消息。
方案四:分频道和优先级队列
将消息按优先级分到不同频道,高优先级频道优先消费。或者结合 Sorted Set 作为优先级队列,消费者轮询 ZPOPMIN 获取高优先级消息。
实际案例:某电商平台通过多消费者+Stream,消息积压从10万降到零,延迟从分钟级到毫秒级。
方案五:监控和自动扩缩容
使用 Redis INFO 和 MONITOR 监控订阅数和积压情况,结合 Prometheus + Grafana 报警。当积压超过阈值,自动启动新消费者实例,实现弹性扩展。
Q: 为什么Redis Pub/Sub 会消息积压?
A: 消费者处理慢于生产者速度,或消费者宕机未及时恢复,导致消息在内存中堆积。
Q: Stream 和 Pub/Sub 哪个更好?
A: Stream 更可靠,支持持久化、消费者组和ACK,适合生产环境;Pub/Sub 简单但无持久化。
Q: 如何快速清空积压消息?
A: 对于Stream,使用 XPENDING 查看待确认消息,XCLAIM 转移给其他消费者;极端情况用 DEL 删除Stream。
Q: 优化后如何验证效果?
A: 监控 XLEN(流长度)、消费延迟和系统负载,确保积压为0且延迟<1s。