旧版本 RabbitMQ 迁移到新集群如何保证数据不丢失

文章导读
迁移旧版本 RabbitMQ 到新集群,最稳妥的方案是采用“双写双读”配合消息持久化与确认机制,而非直接拷贝数据文件,尤其是在跨版本或跨机房场景下。
📋 目录
  1. A 前置准备:插件与工具
  2. B 分步处理
  3. C 怎么验证是否生效
  4. D 常见坑
  5. E 参考来源
A A

迁移旧版本 RabbitMQ 到新集群,最稳妥的方案是采用“双写双读”配合消息持久化与确认机制,而非直接拷贝数据文件,尤其是在跨版本或跨机房场景下。

先说结论:直接拷贝存储文件风险极高,推荐通过消息同步工具或双写方案过渡,并确保端到端持久化。

  • 适合跨版本升级或跨集群迁移场景
  • 先看版本兼容性与队列持久化配置
  • 建议通过消息量对比验证一致性

前置准备:插件与工具

迁移前需确保管理插件和同步插件已启用,并准备好 CLI 工具。

1. 启用管理插件(源与目标集群)

rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel

2. 获取 rabbitmqadmin 工具

该工具默认不安装在系统路径,需从管理界面下载:

wget http://<management-node-ip>:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin
chmod +x /usr/local/bin/rabbitmqadmin

分步处理

1. 评估与准备
检查源集群版本与目标集群版本的兼容性。若跨大版本(如 3.x 到 3.9+),需注意 Quorum Queues 引入的元数据存储变化(Raft 共识),但 Classic Queue 仍主要依赖 Mnesia。确保所有队列设置为 durable=true,消息发送设置 deliveryMode=2。

2. 元数据迁移
使用 rabbitmqadmin 导出交换机、队列和绑定关系定义文件,并在目标集群导入。不要直接拷贝 mnesia 数据库文件,除非是同版本同环境的热备切换。

rabbitmqadmin export definitions.json
# 在目标集群执行导入
rabbitmqadmin import definitions.json

3. 数据同步
方案 A(Shovel 插件):配置 Shovel 将旧集群消息实时同步到新集群队列。需先启用插件。

rabbitmqctl set_parameter shovel my_shovel '{"src-uri": "amqp://user:pass@source:5672/%2f", "src-queue": "source_queue", "dest-uri": "amqp://user:pass@dest:5672/%2f", "dest-queue": "dest_queue"}'

方案 B(双写双读):修改生产端代码同时向新旧集群发送消息,消费端先消费旧集群,逐步切换至新集群。以下是 Java Spring AMQP 双写示例逻辑:

旧版本 RabbitMQ 迁移到新集群如何保证数据不丢失
@Bean
public RabbitTemplate primaryTemplate() {
    // 配置旧集群 ConnectionFactory
    return new RabbitTemplate(primaryConnectionFactory());
}

@Bean
public RabbitTemplate secondaryTemplate() {
    // 配置新集群 ConnectionFactory
    return new RabbitTemplate(secondaryConnectionFactory());
}

// 发送时双写
public void sendMessage(String msg) {
    primaryTemplate().convertAndSend("exchange", "route", msg);
    try {
        secondaryTemplate().convertAndSend("exchange", "route", msg);
    } catch (Exception e) {
        // 记录日志,不影响主流程
        log.warn("Secondary cluster send failed", e);
    }
}

4. 切换与下线
确认新集群消息积压为零且消费正常后,停止旧集群生产端流量,待旧消息消费完毕后下线旧服务。

怎么验证是否生效

1. 命令行对比

登录管理界面或 CLI,对比新旧集群的 Queue 数量与 Message 数量是否一致。

rabbitmqctl list_queues name messages_ready messages_unacknowledged > queue_check.txt

2. 一致性校验脚本

可通过简单脚本对比两端消息总数(需确保迁移期间无新消息写入):

#!/bin/bash
SOURCE_COUNT=$(rabbitmqctl -n source_node list_queues messages | awk '{sum+=$2} END {print sum}')
DEST_COUNT=$(rabbitmqctl -n dest_node list_queues messages | awk '{sum+=$2} END {print sum}')
if [ "$SOURCE_COUNT" == "$DEST_COUNT" ]; then
  echo "Check Passed: $SOURCE_COUNT == $DEST_COUNT"
else
  echo "Check Failed: $SOURCE_COUNT != $DEST_COUNT"
fi

3. 监控指标

观察消费者端的 ACK 日志,确认没有大量 Redelivered 消息。在迁移期间,监控源集群的 message_ready 和 message_unacknowledged 指标,确保最终归零。

常见坑

版本存储引擎差异:RabbitMQ 3.9+ 引入了 Quorum Queues,元数据存储机制有所变化,但 Classic Queue 仍主要依赖 Mnesia。跨版本直接拷贝数据文件官方不支持,极易导致启动失败。
非持久化消息:若消息未设置持久化,迁移期间重启服务会导致内存中消息丢失。
自动 ACK 风险:消费者若使用自动 ACK,宕机时消息会被直接删除,迁移期间建议改为手动 ACK。
rabbitmqadmin 不可用:该工具需手动下载,未下载前直接执行命令会报错 command not found。

参考来源

  • RabbitMQ Official Docs: RabbitMQ Clustering - https://www.rabbitmq.com/clustering.html
  • RabbitMQ Official Docs: Shovel Plugin - https://www.rabbitmq.com/shovel.html
  • RabbitMQ Official Docs: Production Checklist - https://www.rabbitmq.com/production-checklist.html