Redis Lua 脚本能确保消息获取阶段的原子性,避免并发竞态,但真正的“精确一次消费”必须配合业务层幂等设计和确认机制,仅适合轻量级、低吞吐场景。单纯依靠 Lua 无法解决消费者崩溃后的消息恢复,必须引入“可见性超时(Visibility Timeout)”机制。
先说结论:用 Lua 封装“取消息 + 放入处理中队列”可解决 Redis 层面的原子性问题,但无法自动处理消费者崩溃后的消息恢复,需配合超时重投脚本与业务幂等。
- 适合:内部轻量任务、对可靠性要求不极端的异步解耦场景
- 重点看:是否将消息从“待消费”移至“处理中”而非直接标记“已完成”
- 别忽略:消费者处理失败后的重试机制与死信队列设计
核心方案设计
要实现可靠消费,不能直接在获取时标记为“已消费”,而应采用“隐藏消息 + 超时回滚”策略。需准备三个数据结构:
queue:msgs(List):待消费消息队列queue:processing(ZSet):正在处理的消息,Score 设为过期时间戳queue:acked(Set):已成功处理的消息 ID 集合,用于幂等校验
命令速用版
1. 原子获取消息脚本
将消息从 List 弹出并加入 ZSet,设置过期时间。若消费者崩溃,消息会留在 ZSet 中直到超时。
local msg = redis.call('lpop', KEYS[1])
if not msg then return nil end
local expire_at = tonumber(ARGV[1])
redis.call('zadd', KEYS[2], expire_at, msg)
return msg调用示例:eval SCRIPT 2 queue:msgs queue:processing <expire_timestamp>
2. 确认消费脚本(Ack)
业务处理成功后,从 ZSet 移除消息,并记录到已处理集合。
local msg = ARGV[1]
redis.call('zrem', KEYS[1], msg)
redis.call('sadd', KEYS[2], msg)
return 13. 超时重投脚本(Retry)
定时任务调用此脚本,将 ZSet 中过期的消息移回 List 队列。
local now = tonumber(ARGV[1])
local expired = redis.call('zrangebyscore', KEYS[1], 0, now)
if #expired > 0 then
redis.call('zrem', KEYS[1], unpack(expired))
for i, msg in ipairs(expired) do
redis.call('rpush', KEYS[2], msg)
end
end
return #expired消费者完整流程
代码层面需严格遵循以下步骤,确保异常情况下消息不丢失:
- 获取消息:调用获取脚本,若返回 nil 则等待后重试。
- 幂等检查:查询业务数据库或 Redis
queue:acked,若消息已处理过,直接调用 Ack 脚本跳过。 - 执行业务:处理核心逻辑。若抛出异常,不要调用 Ack 脚本,让消息自然超时。
- 确认消费:业务成功后,调用 Ack 脚本移除处理中状态。
伪代码示例:
while true:
msg = redis.eval(GET_SCRIPT, timeout=now+30s)
if not msg: sleep(1); continue
if is_processed(msg.id):
redis.eval(ACK_SCRIPT, msg)
continue
try:
business_logic(msg)
redis.eval(ACK_SCRIPT, msg)
except Exception:
# 不执行 Ack,依赖超时重投脚本恢复消息
log.error("Processing failed, will retry after timeout")怎么验证是否生效
1. 并发测试
启动多个消费者实例同时消费同一队列,观察日志中是否有相同消息 ID 被处理多次。若无重复日志,说明原子获取生效。
2. 模拟崩溃测试
在脚本返回后、业务处理前强制杀死消费者进程。等待超时时间(如 30 秒)后,运行重投脚本,检查消息是否重新回到 queue:msgs 队列。
3. 检查集合数据
使用 ZRANGE queue:processing 0 -1 查看正在处理的消息,确认无长期滞留项;使用 SMEMBERS queue:acked 确认已消费记录。
常见坑与风险
1. 误以为 Lua 能解决所有丢失问题
脚本执行成功仅代表消息从队列移除,若消费者随后宕机且无重试机制,消息实际上已丢失。必须部署定时任务执行“超时重投脚本”。
2. 消息体过大
若消息体超过 10KB,频繁 LPOP 加 Lua 序列化开销明显,且 ZSet 成员过大影响性能。此时应考虑 Redis Stream 类型而非 List + Lua。
3. 时钟 skew 问题
超时重投依赖时间戳,若 Redis 服务器与应用服务器时间不一致,可能导致消息过早或过晚重投。建议统一使用 Redis 服务器时间。
4. ZSet 扫描性能
重投脚本使用 ZRANGEBYSCORE,若 queue:processing 积累大量过期消息,可能导致 Redis 阻塞。生产环境需限制单次重投数量,分批次处理。
参考来源
- Redis Official Documentation - Lua Scripting
- Redis Official Documentation - Data Types (List & ZSet)
- Engineering Best Practices for Reliable Queues