如何使用 Redis List 实现简单的生产者消费者模型?

文章导读
使用 Redis List 实现生产者消费者模型,最适合处理异步任务队列和解耦业务逻辑。生产者通过 LPUSH 写入任务,消费者通过 BRPOP 阻塞读取,避免空轮询消耗 CPU。
📋 目录
  1. 核心原理
  2. Python 代码实现
  3. 可靠性配置
  4. 验证方法
  5. 常见风险与排查
  6. 参考来源
A A

使用 Redis List 实现生产者消费者模型,最适合处理异步任务队列和解耦业务逻辑。生产者通过 LPUSH 写入任务,消费者通过 BRPOP 阻塞读取,避免空轮询消耗 CPU。

先说结论:Redis List 结构简单,适合轻量级消息队列,但需留意消息可靠性

  • 适合:异步解耦、任务分发、流量削峰
  • 核心命令:LPUSH 搭配 BRPOP,避免空轮询
  • 风险:消费者宕机可能导致消息丢失,关键业务需配合持久化或重试机制
  • 建议:生产环境开启 AOF 持久化,代码层增加异常捕获与重试

核心原理

Redis List 本质是一个双向链表,支持从头部或尾部插入弹出。生产者从左侧 push(LPUSH),消费者从右侧 pop(RPOP),天然形成先进先出(FIFO)队列。关键在于 BRPOP 命令,当队列为空时连接会阻塞等待,而不是立即返回空,这避免了消费者线程频繁空转消耗 CPU。

Python 代码实现

以下示例基于 Python redis 库,展示完整的生产者与消费者逻辑,包含异常处理。

1. 生产者代码

import redis
import time
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def produce_task(task_id):
    task = {
        "id": task_id,
        "content": "process_data"
    }
    # 序列化消息
    msg = json.dumps(task)
    # 写入队列左侧
    r.lpush("myqueue", msg)
    print(f"Produced: {task_id}")

if __name__ == "__main__":
    for i in range(10):
        produce_task(i)
        time.sleep(0.5)

2. 消费者代码(含异常重试)

Redis List 原生不支持 ACK 机制。为防止消费者处理业务时宕机导致消息丢失,需在代码层捕获异常并将消息重新推入队列。

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def process_task(msg):
    # 模拟业务逻辑,可能抛出异常
    data = json.loads(msg)
    if data["id"] % 5 == 0:
        raise Exception("Simulated Processing Error")
    print(f"Processed: {data['id']}")

def consume_task():
    while True:
        try:
            # 阻塞读取,超时时间设为 0 表示无限等待
            result = r.brpop("myqueue", timeout=0)
            if result:
                queue_name, msg = result
                try:
                    process_task(msg)
                except Exception as e:
                    print(f"Processing failed: {e}, re-queueing...")
                    # 异常时将消息重新推入队列头部,避免丢失
                    r.lpush("myqueue", msg)
                    # 避免频繁重试导致死循环,可加入适当休眠
                    time.sleep(1)
        except redis.exceptions.ConnectionError:
            print("Redis connection lost, reconnecting...")
            time.sleep(5)

if __name__ == "__main__":
    consume_task()

可靠性配置

Redis 默认配置下数据可能仅存储在内存中,重启会导致队列数据丢失。生产环境建议开启 AOF 持久化。

1. 临时开启(命令行)

CONFIG SET appendonly yes
CONFIG SET appendfsync everysec

2. 永久开启(redis.conf)

appendonly yes
appendfsync everysec

配置完成后需重启 Redis 服务生效。appendfsync 建议设置为 everysec,兼顾性能与安全性。

验证方法

1. 观察日志:消费者终端应打印"Processed"日志,若出现"Processing failed"则说明重试机制生效。

如何使用 Redis List 实现简单的生产者消费者模型?

2. 检查队列长度:使用 Redis 命令行工具执行 LLEN myqueue。若消费者处理速度快于生产,队列长度应维持在低位或为零。

3. 模拟宕机:在消费者处理过程中强制停止进程,重启后检查消息是否因持久化配置而保留,或因重试机制被重新处理。

常见风险与排查

1. 消息丢失风险
场景:消费者 BRPOP 取出消息后,在处理业务逻辑时宕机,消息已出队但未处理完成。
对策:代码层增加 try-catch 重试机制;关键业务建议评估使用 Redis Stream(支持 ACK)或专业消息队列(如 RabbitMQ)。

2. 空轮询消耗
场景:若使用 RPOP 而非 BRPOP 且无休眠控制,会持续占用 CPU 资源。
对策:务必使用 BRPOP 并设置合理 timeout。

3. 多消费者竞争
场景:多个消费者监听同一列表时,每条消息只会被其中一个消费者获取。
对策:适合任务分发,不适合广播。若需广播,需为每个消费者创建独立队列。

4. 队列积压
场景:生产速度远大于消费速度。
对策:监控 LLEN 指标,设置告警阈值;增加消费者实例数量。

参考来源

  • Redis 官方文档 - List 数据类型
  • Redis 持久化配置指南
  • 阿里云开发者社区 - Redis 消息队列实践