如何使用 Redis Lua 脚本实现消息队列的精确一次消费?

文章导读
Redis Lua 脚本能确保消息获取阶段的原子性,避免并发竞态,但真正的“精确一次消费”必须配合业务层幂等设计和确认机制,仅适合轻量级、低吞吐场景。单纯依靠 Lua 无法解决消费者崩溃后的消息恢复,必须引入“可见性超时(Visibility Timeout)”机制。
📋 目录
  1. 核心方案设计
  2. 命令速用版
  3. 消费者完整流程
  4. 怎么验证是否生效
  5. 常见坑与风险
  6. 参考来源
A A

Redis Lua 脚本能确保消息获取阶段的原子性,避免并发竞态,但真正的“精确一次消费”必须配合业务层幂等设计和确认机制,仅适合轻量级、低吞吐场景。单纯依靠 Lua 无法解决消费者崩溃后的消息恢复,必须引入“可见性超时(Visibility Timeout)”机制。

先说结论:用 Lua 封装“取消息 + 放入处理中队列”可解决 Redis 层面的原子性问题,但无法自动处理消费者崩溃后的消息恢复,需配合超时重投脚本与业务幂等。

  • 适合:内部轻量任务、对可靠性要求不极端的异步解耦场景
  • 重点看:是否将消息从“待消费”移至“处理中”而非直接标记“已完成”
  • 别忽略:消费者处理失败后的重试机制与死信队列设计

核心方案设计

要实现可靠消费,不能直接在获取时标记为“已消费”,而应采用“隐藏消息 + 超时回滚”策略。需准备三个数据结构:

  • queue:msgs (List):待消费消息队列
  • queue:processing (ZSet):正在处理的消息,Score 设为过期时间戳
  • queue:acked (Set):已成功处理的消息 ID 集合,用于幂等校验

命令速用版

1. 原子获取消息脚本

将消息从 List 弹出并加入 ZSet,设置过期时间。若消费者崩溃,消息会留在 ZSet 中直到超时。

local msg = redis.call('lpop', KEYS[1])
if not msg then return nil end
local expire_at = tonumber(ARGV[1])
redis.call('zadd', KEYS[2], expire_at, msg)
return msg

调用示例:eval SCRIPT 2 queue:msgs queue:processing <expire_timestamp>

2. 确认消费脚本(Ack)

业务处理成功后,从 ZSet 移除消息,并记录到已处理集合。

local msg = ARGV[1]
redis.call('zrem', KEYS[1], msg)
redis.call('sadd', KEYS[2], msg)
return 1

3. 超时重投脚本(Retry)

如何使用 Redis Lua 脚本实现消息队列的精确一次消费?

定时任务调用此脚本,将 ZSet 中过期的消息移回 List 队列。

local now = tonumber(ARGV[1])
local expired = redis.call('zrangebyscore', KEYS[1], 0, now)
if #expired > 0 then
    redis.call('zrem', KEYS[1], unpack(expired))
    for i, msg in ipairs(expired) do
        redis.call('rpush', KEYS[2], msg)
    end
end
return #expired

消费者完整流程

代码层面需严格遵循以下步骤,确保异常情况下消息不丢失:

  1. 获取消息:调用获取脚本,若返回 nil 则等待后重试。
  2. 幂等检查:查询业务数据库或 Redis queue:acked,若消息已处理过,直接调用 Ack 脚本跳过。
  3. 执行业务:处理核心逻辑。若抛出异常,不要调用 Ack 脚本,让消息自然超时。
  4. 确认消费:业务成功后,调用 Ack 脚本移除处理中状态。

伪代码示例:

while true:
    msg = redis.eval(GET_SCRIPT, timeout=now+30s)
    if not msg: sleep(1); continue
    
    if is_processed(msg.id): 
        redis.eval(ACK_SCRIPT, msg)
        continue
    
    try:
        business_logic(msg)
        redis.eval(ACK_SCRIPT, msg)
    except Exception:
        # 不执行 Ack,依赖超时重投脚本恢复消息
        log.error("Processing failed, will retry after timeout")

怎么验证是否生效

1. 并发测试

启动多个消费者实例同时消费同一队列,观察日志中是否有相同消息 ID 被处理多次。若无重复日志,说明原子获取生效。

2. 模拟崩溃测试

在脚本返回后、业务处理前强制杀死消费者进程。等待超时时间(如 30 秒)后,运行重投脚本,检查消息是否重新回到 queue:msgs 队列。

3. 检查集合数据

使用 ZRANGE queue:processing 0 -1 查看正在处理的消息,确认无长期滞留项;使用 SMEMBERS queue:acked 确认已消费记录。

如何使用 Redis Lua 脚本实现消息队列的精确一次消费?

常见坑与风险

1. 误以为 Lua 能解决所有丢失问题

脚本执行成功仅代表消息从队列移除,若消费者随后宕机且无重试机制,消息实际上已丢失。必须部署定时任务执行“超时重投脚本”。

2. 消息体过大

若消息体超过 10KB,频繁 LPOP 加 Lua 序列化开销明显,且 ZSet 成员过大影响性能。此时应考虑 Redis Stream 类型而非 List + Lua。

3. 时钟 skew 问题

超时重投依赖时间戳,若 Redis 服务器与应用服务器时间不一致,可能导致消息过早或过晚重投。建议统一使用 Redis 服务器时间。

4. ZSet 扫描性能

重投脚本使用 ZRANGEBYSCORE,若 queue:processing 积累大量过期消息,可能导致 Redis 阻塞。生产环境需限制单次重投数量,分批次处理。

参考来源

  • Redis Official Documentation - Lua Scripting
  • Redis Official Documentation - Data Types (List & ZSet)
  • Engineering Best Practices for Reliable Queues