Redis 原生并不像专业 MQ 那样自带完善的死信队列功能,避免堆积的核心在于使用 Redis Stream 结构配合消费者组,并通过脚本控制重试次数,超时未确认的消息需手动转入死信流。
先说结论:基于 Redis Stream 实现消息确认与重试,配合 Lua 脚本管理最大重试次数,是避免死信堆积的可行方案。
- 适合:基于 Redis 构建轻量级可靠队列且已使用 Redis 5.0+ 版本的场景。
- 先看:确认业务是否能接受最终一致性,以及 Redis 内存成本。
- 建议:重试次数不宜过多,死信消息需定期清理或转入外部存储分析。
- 关键:必须使用独立 Key 存储重试计数,并配合 Lua 保证原子性。
核心设计方案
重试计数的存储不能依赖消息体本身,因为 Stream 消息不可变。推荐方案是使用独立的 Hash 结构存储重试次数。
Key 命名规范:retry_count:{stream_name}:{message_id}
逻辑流程:
- 消费者获取消息后处理失败。
- 查询重试计数 Key,若不存在则初始化为 1,若存在则 +1。
- 若计数超过阈值(如 3 次),将消息内容写入死信 Stream,并删除重试计数 Key。
- 若未超过阈值,不 ACK,等待下一次 XPENDING 扫描或客户端重试。
核心 Lua 脚本实现
为了保证重试计数检查与死信转移的原子性,避免并发导致计数错误或消息丢失,必须使用 Lua 脚本。以下脚本实现检查计数、递增及转移逻辑:
-- KEYS[1]: 重试计数 Key (retry_count:stream:id)
-- KEYS[2]: 死信队列 Stream (dead_letter_stream)
-- ARGV[1]: 最大重试次数
-- ARGV[2]: 消息内容 (序列化后的字符串)
-- ARGV[3]: 消息 ID
local count = tonumber(redis.call('GET', KEYS[1]) or "0")
count = count + 1
if count > tonumber(ARGV[1]) then
-- 超过阈值,写入死信队列
redis.call('XADD', KEYS[2], '*', 'origin_id', ARGV[3], 'content', ARGV[2])
-- 清理重试计数
redis.call('DEL', KEYS[1])
return 1 -- 表示已转入死信
else
-- 未超过阈值,更新计数
redis.call('SET', KEYS[1], count)
-- 设置过期时间,防止脏数据堆积 (例如 1 天)
redis.call('EXPIRE', KEYS[1], 86400)
return 0 -- 表示继续重试
end消费者端代码实战
以下 Python 示例展示了如何在业务逻辑中集成上述重试机制。注意异常捕获与 Lua 脚本的调用时机。
import redis
import json
r = redis.Redis(host='localhost', port=6379)
lua_script = r.register_script("""...上述 Lua 脚本内容...""")
stream_name = "order_stream"
group_name = "order_group"
consumer_name = "consumer_1"
dlq_stream = "order_dlq"
max_retries = 3
while True:
# 1. 读取消息
messages = r.xreadgroup(group_name, consumer_name, {stream_name: ">"}, count=1, block=1000)
if not messages:
continue
stream, entries = messages[0]
for message_id, fields in entries:
try:
# 2. 业务处理
process_order(fields)
# 3. 处理成功,确认消息
r.xack(stream_name, group_name, message_id)
# 4. 清理可能存在的重试计数 (防御性编程)
r.delete(f"retry_count:{stream_name}:{message_id}")
except Exception as e:
# 5. 处理失败,执行重试逻辑
# 不发送 XACK,让消息留在 PEL
# 调用 Lua 脚本管理重试次数
retry_key = f"retry_count:{stream_name}:{message_id}"
msg_content = json.dumps(fields)
result = lua_script(keys=[retry_key, dlq_stream], args=[max_retries, msg_content, message_id])
if result == 1:
print(f"Message {message_id} moved to DLQ")
# 可选:手动 ACK 掉原消息,避免 PEL 无限堆积,因为已转入死信
r.xack(stream_name, group_name, message_id)
else:
print(f"Message {message_id} retry count updated, waiting for next claim")验证与监控
部署后需通过以下命令验证机制是否生效:
- 检查待处理消息:
XPENDING order_stream order_group查看是否有长期未确认消息。 - 检查重试计数 Key:
KEYS retry_count:*确认是否有临时计数 Key 生成。 - 检查死信队列:
XLEN order_dlq确认失败消息是否被正确转移。 - 监控告警:配置监控项,当
XLEN order_dlq超过阈值(如 100)时触发告警。
常见风险与排查
- 内存泄漏风险:重试计数 Key 必须设置过期时间(EXPIRE),否则处理成功的消息若未清理计数 Key,会导致内存持续增长。
- 持久化配置:Redis 重启可能导致 PEL 中的 pending 消息丢失。务必开启 AOF 持久化(appendonly yes),并评估 RDB 快照频率。
- 背压处理:若消费者处理速度远低于生产速度,PEL 列表会膨胀。需在应用层实现限流,或监控
XINFO GROUP中的 pending 数量。 - 消息丢失:Lua 脚本中转入死信后,建议手动 ACK 原消息,否则原消息仍占 PEL 名额但实际已处理完毕。
参考来源
- Redis Official Documentation - Stream
- Redis Official Documentation - Lua Scripting
- Redis Official Documentation - Persistence