RocketMQ 原生支持分布式事务消息,通过半消息机制保证本地事务与消息发送的原子性;RabbitMQ 原生仅支持 AMQP 协议层面的事务,不具备分布式事务能力,需结合本地消息表等方案实现最终一致性。
先说结论:若业务强依赖「本地数据库操作 + 消息发送」的原子性,优先选 RocketMQ;若仅需消息可靠投递且能接受业务层兜底,RabbitMQ 配合确认机制即可。
- 适合:RocketMQ 适合微服务间强一致性场景,RabbitMQ 适合高可靠投递但事务需自理的场景
- 重点看:RocketMQ 需实现事务监听器,RabbitMQ 需关注 Confirm 机制与业务逻辑配合
- 别忽略:RabbitMQ 开启 AMQP 事务会显著降低吞吐量,生产环境通常改用 Confirm 模式
核心机制差异与时序
两者设计目标不同。RocketMQ 诞生于电商交易场景,内置了事务协调器。RabbitMQ 遵循 AMQP 标准,其事务机制仅保证单 Channel 内消息发送的原子性。
RocketMQ 事务消息时序:
- Producer 发送 Half Message(半消息)到 Broker。
- Broker 返回发送成功。
- Producer 执行本地数据库事务。
- 根据本地事务结果,向 Broker 提交 Commit 或 Rollback。
- 若 Broker 未收到确认,会定期回查 Producer 本地事务状态。
RabbitMQ 本地消息表时序:
- 开启本地数据库事务。
- 写入业务数据表。
- 写入本地消息表(状态为待发送)。
- 提交本地数据库事务。
- 异步读取本地消息表发送消息到 RabbitMQ。
- 收到 Confirm 回调后更新本地消息表状态为已完成。
RocketMQ 事务消息实操
1. 核心代码实现
需实现 TransactionListener 接口,处理本地事务执行及回查逻辑。
public class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地业务逻辑,如扣减库存
try {
orderService.createOrder((Order) arg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查逻辑,根据 msg 中的事务 ID 查询本地事务状态
String transactionId = msg.getUserProperty("UNIQ_KEY");
boolean status = orderService.queryOrderStatus(transactionId);
return status ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
}2. 配置与风险
Producer 端需设置 TransactionalId 并注册监听器。注意 Broker 端配置 transactionCheckInterval(默认 60 秒),若设置过短会增加 Broker 负载;若过长则事务恢复慢。生产环境建议保持默认或根据业务容忍度微调。
RabbitMQ 可靠投递实操
1. 本地消息表结构
需创建独立表记录消息状态,确保与业务数据在同一事务中提交。
CREATE TABLE `local_message` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`message_id` varchar(64) NOT NULL,
`exchange` varchar(64) DEFAULT NULL,
`routing_key` varchar(64) DEFAULT NULL,
`content` text,
`status` tinyint(4) DEFAULT '0' COMMENT '0:待发送 1:已发送',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
);2. 核心代码实现
业务服务中需保证写库与写消息表在同一 @Transactional 范围内。发送后通过 Confirm 回调更新状态。
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void createOrder(Order order) {
// 1. 写入业务表
orderMapper.insert(order);
// 2. 写入本地消息表
LocalMessage msg = new LocalMessage();
msg.setMessageId(UUID.randomUUID().toString());
msg.setContent(JSON.toJSONString(order));
msg.setStatus(0);
messageMapper.insert(msg);
// 3. 发送消息(事务提交后异步发送或立即发送)
CorrelationData correlationData = new CorrelationData(msg.getMessageId());
rabbitTemplate.convertAndSend("order.exchange", "order.key", order, correlationData);
}
// 配置 Confirm 回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 更新本地消息表状态为 1
messageMapper.updateStatus(correlationData.getId(), 1);
} else {
// 记录失败日志,触发重试机制
log.error("消息发送失败:{}", cause);
}
});3. 一致性保证细节
必须确保「业务数据写入」与「本地消息表写入」在同一个本地数据库事务中。若分开操作,可能出现业务成功但消息记录未保存,导致消息丢失且无法重试。
验证与排查
1. RocketMQ 验证
- 查看 Broker 端
transactionCheckInterval配置。 - 模拟本地事务异常抛出,观察 Consumer 是否未收到消息,且 Broker 日志显示回滚。
- 模拟 Producer 宕机,验证 Broker 是否发起回查请求。
2. RabbitMQ 验证
- 查询本地消息表,确认业务提交后是否存在状态为 0 的记录。
- 模拟 Broker 宕机,确认本地是否有定时任务扫描未发送消息并重试。
- 查看 Confirm 回调日志,确认 ack/nack 是否正确处理。
常见坑与风险
1. RabbitMQ 事务性能陷阱
RabbitMQ 的 AMQP 事务(txSelect)是同步阻塞的,每发一条消息都要等待确认,吞吐量会大幅下降,生产环境高并发场景慎用,建议用 Confirm 异步确认替代。
2. RocketMQ 回查逻辑幂等
事务回查接口可能被多次调用,需确保回查逻辑幂等,避免重复执行本地业务查询导致状态不一致。建议仅查询状态,不执行变更操作。
3. 消息重复消费
无论哪种方案,网络抖动都可能导致消息重复,消费者端必须实现幂等处理(如通过唯一业务 ID 去重),不能仅依赖 MQ 的事务机制。
4. 本地消息表清理
本地消息表数据会持续增长,需定期清理已完成状态的历史数据,避免影响查询性能。