RocketMQ和RabbitMQ事务消息实现机制有什么不同?

文章导读
RocketMQ 原生支持分布式事务消息,通过半消息机制保证本地事务与消息发送的原子性;RabbitMQ 原生仅支持 AMQP 协议层面的事务,不具备分布式事务能力,需结合本地消息表等方案实现最终一致性。
📋 目录
  1. 核心机制差异与时序
  2. RocketMQ 事务消息实操
  3. RabbitMQ 可靠投递实操
  4. 验证与排查
  5. 常见坑与风险
  6. 参考来源
A A

RocketMQ 原生支持分布式事务消息,通过半消息机制保证本地事务与消息发送的原子性;RabbitMQ 原生仅支持 AMQP 协议层面的事务,不具备分布式事务能力,需结合本地消息表等方案实现最终一致性。

先说结论:若业务强依赖「本地数据库操作 + 消息发送」的原子性,优先选 RocketMQ;若仅需消息可靠投递且能接受业务层兜底,RabbitMQ 配合确认机制即可。

  • 适合:RocketMQ 适合微服务间强一致性场景,RabbitMQ 适合高可靠投递但事务需自理的场景
  • 重点看:RocketMQ 需实现事务监听器,RabbitMQ 需关注 Confirm 机制与业务逻辑配合
  • 别忽略:RabbitMQ 开启 AMQP 事务会显著降低吞吐量,生产环境通常改用 Confirm 模式

核心机制差异与时序

两者设计目标不同。RocketMQ 诞生于电商交易场景,内置了事务协调器。RabbitMQ 遵循 AMQP 标准,其事务机制仅保证单 Channel 内消息发送的原子性。

RocketMQ 事务消息时序:

  1. Producer 发送 Half Message(半消息)到 Broker。
  2. Broker 返回发送成功。
  3. Producer 执行本地数据库事务。
  4. 根据本地事务结果,向 Broker 提交 Commit 或 Rollback。
  5. 若 Broker 未收到确认,会定期回查 Producer 本地事务状态。

RabbitMQ 本地消息表时序:

  1. 开启本地数据库事务。
  2. 写入业务数据表。
  3. 写入本地消息表(状态为待发送)。
  4. 提交本地数据库事务。
  5. 异步读取本地消息表发送消息到 RabbitMQ。
  6. 收到 Confirm 回调后更新本地消息表状态为已完成。

RocketMQ 事务消息实操

1. 核心代码实现

需实现 TransactionListener 接口,处理本地事务执行及回查逻辑。

public class OrderTransactionListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地业务逻辑,如扣减库存
        try {
            orderService.createOrder((Order) arg);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务回查逻辑,根据 msg 中的事务 ID 查询本地事务状态
        String transactionId = msg.getUserProperty("UNIQ_KEY");
        boolean status = orderService.queryOrderStatus(transactionId);
        return status ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

2. 配置与风险

Producer 端需设置 TransactionalId 并注册监听器。注意 Broker 端配置 transactionCheckInterval(默认 60 秒),若设置过短会增加 Broker 负载;若过长则事务恢复慢。生产环境建议保持默认或根据业务容忍度微调。

RocketMQ和RabbitMQ事务消息实现机制有什么不同?

RabbitMQ 可靠投递实操

1. 本地消息表结构

需创建独立表记录消息状态,确保与业务数据在同一事务中提交。

CREATE TABLE `local_message` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `message_id` varchar(64) NOT NULL,
  `exchange` varchar(64) DEFAULT NULL,
  `routing_key` varchar(64) DEFAULT NULL,
  `content` text,
  `status` tinyint(4) DEFAULT '0' COMMENT '0:待发送 1:已发送',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
);

2. 核心代码实现

业务服务中需保证写库与写消息表在同一 @Transactional 范围内。发送后通过 Confirm 回调更新状态。

@Autowired
private RabbitTemplate rabbitTemplate;

@Transactional
public void createOrder(Order order) {
    // 1. 写入业务表
    orderMapper.insert(order);
    // 2. 写入本地消息表
    LocalMessage msg = new LocalMessage();
    msg.setMessageId(UUID.randomUUID().toString());
    msg.setContent(JSON.toJSONString(order));
    msg.setStatus(0);
    messageMapper.insert(msg);
    // 3. 发送消息(事务提交后异步发送或立即发送)
    CorrelationData correlationData = new CorrelationData(msg.getMessageId());
    rabbitTemplate.convertAndSend("order.exchange", "order.key", order, correlationData);
}

// 配置 Confirm 回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        // 更新本地消息表状态为 1
        messageMapper.updateStatus(correlationData.getId(), 1);
    } else {
        // 记录失败日志,触发重试机制
        log.error("消息发送失败:{}", cause);
    }
});

3. 一致性保证细节

必须确保「业务数据写入」与「本地消息表写入」在同一个本地数据库事务中。若分开操作,可能出现业务成功但消息记录未保存,导致消息丢失且无法重试。

验证与排查

1. RocketMQ 验证

  • 查看 Broker 端 transactionCheckInterval 配置。
  • 模拟本地事务异常抛出,观察 Consumer 是否未收到消息,且 Broker 日志显示回滚。
  • 模拟 Producer 宕机,验证 Broker 是否发起回查请求。

2. RabbitMQ 验证

RocketMQ和RabbitMQ事务消息实现机制有什么不同?
  • 查询本地消息表,确认业务提交后是否存在状态为 0 的记录。
  • 模拟 Broker 宕机,确认本地是否有定时任务扫描未发送消息并重试。
  • 查看 Confirm 回调日志,确认 ack/nack 是否正确处理。

常见坑与风险

1. RabbitMQ 事务性能陷阱

RabbitMQ 的 AMQP 事务(txSelect)是同步阻塞的,每发一条消息都要等待确认,吞吐量会大幅下降,生产环境高并发场景慎用,建议用 Confirm 异步确认替代。

2. RocketMQ 回查逻辑幂等

事务回查接口可能被多次调用,需确保回查逻辑幂等,避免重复执行本地业务查询导致状态不一致。建议仅查询状态,不执行变更操作。

3. 消息重复消费

无论哪种方案,网络抖动都可能导致消息重复,消费者端必须实现幂等处理(如通过唯一业务 ID 去重),不能仅依赖 MQ 的事务机制。

4. 本地消息表清理

本地消息表数据会持续增长,需定期清理已完成状态的历史数据,避免影响查询性能。

参考来源