旧版 Redis List 队列迁移到 Stream 结构怎么操作?

文章导读
从 Redis List 迁移到 Stream 不是单条命令能完成的,本质是业务代码重构配合数据过渡方案,适合需要消息确认(ACK)和消费者组功能的场景。
📋 目录
  1. 命令速用版
  2. 迁移核心步骤
  3. 怎么验证是否生效
  4. 常见坑与回滚方案
A A

从 Redis List 迁移到 Stream 不是单条命令能完成的,本质是业务代码重构配合数据过渡方案,适合需要消息确认(ACK)和消费者组功能的场景。

先说结论:这是一次架构升级而非简单配置修改,需评估业务对可靠性的需求后再动手。

  • 适合:需要消息不丢失、支持消息回溯、多消费者负载均衡的场景。
  • 先看:确认 Redis 版本是否在 5.0 及以上,Stream 类型才可用。
  • 建议:采用双写过渡或停服迁移,避免直接切换导致旧数据丢失。

命令速用版

List 与 Stream 的核心操作命令对比如下,迁移时需将左侧命令替换为右侧:

# 生产者发送
# 旧:LPUSH queue_name "message"
# 新:XADD stream_name * message "message"

# 消费者接收
# 旧:BRPOP queue_name 0
# 新:XREADGROUP GROUP group_name consumer_name STREAMS stream_name >

# 消息确认
# 旧:无
# 新:XACK stream_name group_name message_id

迁移核心步骤

迁移过程分为环境检查、代码改造(双写)、数据过渡、流量切换四个阶段:

1. 环境检查
登录 Redis 服务器执行INFO server,确认redis_version大于等于 5.0.0。若版本过低,需先升级实例。

旧版 Redis List 队列迁移到 Stream 结构怎么操作?

2. 代码改造(双写保障)
修改生产者逻辑,新消息同时写入 List 和 Stream。为保证一致性,建议先写 Stream 再写 List,或使用 Redis 事务包裹。以下为 Python 双写示例:

import redis

r = redis.Redis(host='localhost', port=6379)

def publish_message(msg):
    # 优先写入 Stream (新架构)
    r.xadd('mystream', {'body': msg})
    # 兼容写入 List (旧架构,用于过渡)
    r.lpush('mylist', msg)
    # 注意:生产环境建议增加异常捕获,确保至少写入成功一个

3. 数据过渡(迁移脚本)
由于 List 和 Stream 数据结构不兼容,无法直接转换。需启动迁移脚本,逐步读取 List 中剩余旧消息写入 Stream 或直接消费。以下为 Python 迁移脚本示例:

import redis
import time

r = redis.Redis(host='localhost', port=6379)

def migrate_list_to_stream():
    while True:
        # 阻塞读取 List 剩余消息
        result = r.brpop('mylist', timeout=5)
        if not result:
            # 超时且无数据,认为迁移完成
            break
        msg = result[1]
        # 写入 Stream
        r.xadd('mystream', {'body': msg})
        print(f"Migrated: {msg}")

if __name__ == '__main__':
    migrate_list_to_stream()

4. 流量切换
确认 Stream 消费正常且 List 长度为 0 后,将消费者配置完全指向 Stream,移除 List 相关代码。

怎么验证是否生效

迁移完成后,通过以下命令检查状态:

旧版 Redis List 队列迁移到 Stream 结构怎么操作?

1. 检查消息积压
使用XLEN stream_name查看 Stream 总消息数,结合业务量判断是否正常。

2. 检查待确认消息
使用XPENDING stream_name group_name查看是否有长期未 ACK 的消息。若返回数量持续增加,说明消费者处理逻辑或 ACK 代码有问题。

3. 检查消费组信息
使用XINFO GROUPS stream_name查看消费者组状态,确认consumers数量符合预期。

旧版 Redis List 队列迁移到 Stream 结构怎么操作?

常见坑与回滚方案

1. 消息 ID 处理
XADD时使用*让 Redis 自动生成 ID 即可,不要尝试手动生成时间戳 ID,除非有特殊排序需求,否则易导致 ID 冲突或乱序。

2. 消费者组创建异常
XGROUP CREATE在组已存在时会报错,代码中需捕获该异常或先检查是否存在。首次消费时使用>符号表示只读取新消息,若需处理历史数据需指定 ID。

3. 双写一致性风险
双写期间若 Redis 抖动,可能导致 List 和 Stream 数据不一致。建议监控两端写入成功率,过渡期结束后以 Stream 数据为准。

4. 回滚方案
若 Stream 消费出现严重问题,需立即回滚:
1. 停止 Stream 写入,恢复仅写 List。
2. 消费者配置切回 List 消费模式。
3. 检查 Stream 中未消费消息,必要时人工补偿或重新导入 List(需注意顺序)。