使用 Redis List 实现生产者消费者模型,最适合处理异步任务队列和解耦业务逻辑。生产者通过 LPUSH 写入任务,消费者通过 BRPOP 阻塞读取,避免空轮询消耗 CPU。
先说结论:Redis List 结构简单,适合轻量级消息队列,但需留意消息可靠性
- 适合:异步解耦、任务分发、流量削峰
- 核心命令:LPUSH 搭配 BRPOP,避免空轮询
- 风险:消费者宕机可能导致消息丢失,关键业务需配合持久化或重试机制
- 建议:生产环境开启 AOF 持久化,代码层增加异常捕获与重试
核心原理
Redis List 本质是一个双向链表,支持从头部或尾部插入弹出。生产者从左侧 push(LPUSH),消费者从右侧 pop(RPOP),天然形成先进先出(FIFO)队列。关键在于 BRPOP 命令,当队列为空时连接会阻塞等待,而不是立即返回空,这避免了消费者线程频繁空转消耗 CPU。
Python 代码实现
以下示例基于 Python redis 库,展示完整的生产者与消费者逻辑,包含异常处理。
1. 生产者代码
import redis
import time
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def produce_task(task_id):
task = {
"id": task_id,
"content": "process_data"
}
# 序列化消息
msg = json.dumps(task)
# 写入队列左侧
r.lpush("myqueue", msg)
print(f"Produced: {task_id}")
if __name__ == "__main__":
for i in range(10):
produce_task(i)
time.sleep(0.5)2. 消费者代码(含异常重试)
Redis List 原生不支持 ACK 机制。为防止消费者处理业务时宕机导致消息丢失,需在代码层捕获异常并将消息重新推入队列。
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def process_task(msg):
# 模拟业务逻辑,可能抛出异常
data = json.loads(msg)
if data["id"] % 5 == 0:
raise Exception("Simulated Processing Error")
print(f"Processed: {data['id']}")
def consume_task():
while True:
try:
# 阻塞读取,超时时间设为 0 表示无限等待
result = r.brpop("myqueue", timeout=0)
if result:
queue_name, msg = result
try:
process_task(msg)
except Exception as e:
print(f"Processing failed: {e}, re-queueing...")
# 异常时将消息重新推入队列头部,避免丢失
r.lpush("myqueue", msg)
# 避免频繁重试导致死循环,可加入适当休眠
time.sleep(1)
except redis.exceptions.ConnectionError:
print("Redis connection lost, reconnecting...")
time.sleep(5)
if __name__ == "__main__":
consume_task()可靠性配置
Redis 默认配置下数据可能仅存储在内存中,重启会导致队列数据丢失。生产环境建议开启 AOF 持久化。
1. 临时开启(命令行)
CONFIG SET appendonly yes
CONFIG SET appendfsync everysec2. 永久开启(redis.conf)
appendonly yes
appendfsync everysec配置完成后需重启 Redis 服务生效。appendfsync 建议设置为 everysec,兼顾性能与安全性。
验证方法
1. 观察日志:消费者终端应打印"Processed"日志,若出现"Processing failed"则说明重试机制生效。
2. 检查队列长度:使用 Redis 命令行工具执行 LLEN myqueue。若消费者处理速度快于生产,队列长度应维持在低位或为零。
3. 模拟宕机:在消费者处理过程中强制停止进程,重启后检查消息是否因持久化配置而保留,或因重试机制被重新处理。
常见风险与排查
1. 消息丢失风险
场景:消费者 BRPOP 取出消息后,在处理业务逻辑时宕机,消息已出队但未处理完成。
对策:代码层增加 try-catch 重试机制;关键业务建议评估使用 Redis Stream(支持 ACK)或专业消息队列(如 RabbitMQ)。
2. 空轮询消耗
场景:若使用 RPOP 而非 BRPOP 且无休眠控制,会持续占用 CPU 资源。
对策:务必使用 BRPOP 并设置合理 timeout。
3. 多消费者竞争
场景:多个消费者监听同一列表时,每条消息只会被其中一个消费者获取。
对策:适合任务分发,不适合广播。若需广播,需为每个消费者创建独立队列。
4. 队列积压
场景:生产速度远大于消费速度。
对策:监控 LLEN 指标,设置告警阈值;增加消费者实例数量。
参考来源
- Redis 官方文档 - List 数据类型
- Redis 持久化配置指南
- 阿里云开发者社区 - Redis 消息队列实践