从 Redis List 迁移到 Stream 不是单条命令能完成的,本质是业务代码重构配合数据过渡方案,适合需要消息确认(ACK)和消费者组功能的场景。
先说结论:这是一次架构升级而非简单配置修改,需评估业务对可靠性的需求后再动手。
- 适合:需要消息不丢失、支持消息回溯、多消费者负载均衡的场景。
- 先看:确认 Redis 版本是否在 5.0 及以上,Stream 类型才可用。
- 建议:采用双写过渡或停服迁移,避免直接切换导致旧数据丢失。
命令速用版
List 与 Stream 的核心操作命令对比如下,迁移时需将左侧命令替换为右侧:
# 生产者发送
# 旧:LPUSH queue_name "message"
# 新:XADD stream_name * message "message"
# 消费者接收
# 旧:BRPOP queue_name 0
# 新:XREADGROUP GROUP group_name consumer_name STREAMS stream_name >
# 消息确认
# 旧:无
# 新:XACK stream_name group_name message_id迁移核心步骤
迁移过程分为环境检查、代码改造(双写)、数据过渡、流量切换四个阶段:
1. 环境检查
登录 Redis 服务器执行INFO server,确认redis_version大于等于 5.0.0。若版本过低,需先升级实例。
2. 代码改造(双写保障)
修改生产者逻辑,新消息同时写入 List 和 Stream。为保证一致性,建议先写 Stream 再写 List,或使用 Redis 事务包裹。以下为 Python 双写示例:
import redis
r = redis.Redis(host='localhost', port=6379)
def publish_message(msg):
# 优先写入 Stream (新架构)
r.xadd('mystream', {'body': msg})
# 兼容写入 List (旧架构,用于过渡)
r.lpush('mylist', msg)
# 注意:生产环境建议增加异常捕获,确保至少写入成功一个3. 数据过渡(迁移脚本)
由于 List 和 Stream 数据结构不兼容,无法直接转换。需启动迁移脚本,逐步读取 List 中剩余旧消息写入 Stream 或直接消费。以下为 Python 迁移脚本示例:
import redis
import time
r = redis.Redis(host='localhost', port=6379)
def migrate_list_to_stream():
while True:
# 阻塞读取 List 剩余消息
result = r.brpop('mylist', timeout=5)
if not result:
# 超时且无数据,认为迁移完成
break
msg = result[1]
# 写入 Stream
r.xadd('mystream', {'body': msg})
print(f"Migrated: {msg}")
if __name__ == '__main__':
migrate_list_to_stream()4. 流量切换
确认 Stream 消费正常且 List 长度为 0 后,将消费者配置完全指向 Stream,移除 List 相关代码。
怎么验证是否生效
迁移完成后,通过以下命令检查状态:
1. 检查消息积压
使用XLEN stream_name查看 Stream 总消息数,结合业务量判断是否正常。
2. 检查待确认消息
使用XPENDING stream_name group_name查看是否有长期未 ACK 的消息。若返回数量持续增加,说明消费者处理逻辑或 ACK 代码有问题。
3. 检查消费组信息
使用XINFO GROUPS stream_name查看消费者组状态,确认consumers数量符合预期。
常见坑与回滚方案
1. 消息 ID 处理XADD时使用*让 Redis 自动生成 ID 即可,不要尝试手动生成时间戳 ID,除非有特殊排序需求,否则易导致 ID 冲突或乱序。
2. 消费者组创建异常XGROUP CREATE在组已存在时会报错,代码中需捕获该异常或先检查是否存在。首次消费时使用>符号表示只读取新消息,若需处理历史数据需指定 ID。
3. 双写一致性风险
双写期间若 Redis 抖动,可能导致 List 和 Stream 数据不一致。建议监控两端写入成功率,过渡期结束后以 Stream 数据为准。
4. 回滚方案
若 Stream 消费出现严重问题,需立即回滚:
1. 停止 Stream 写入,恢复仅写 List。
2. 消费者配置切回 List 消费模式。
3. 检查 Stream 中未消费消息,必要时人工补偿或重新导入 List(需注意顺序)。