热议:我怎么不知道RocketMQ生产者有这么多用法?新进度揭秘

文章导读
结论:RocketMQ生产者用法超多!最简单上手就是DefaultMQProducer,配置nameSrvAddr和groupName,就能send消息。高级点用TransactionMQProducer处理事务消息,确保消息和数据库操作一致。新手必备:TransactionListener里checkLocalTransaction返回COMMIT/ROLLBACK/UNKNOW。批量发送用Pr
📋 目录
  1. A 用法1:基础同步发送
  2. B 用法2:异步发送带回调
  3. C 用法3:单向发送(不关心结果)
  4. D 用法4:事务消息全流程
  5. E 用法5:批量发送优化
  6. F 用法6:延时和顺序消息
A A

结论:RocketMQ生产者用法超多!最简单上手就是DefaultMQProducer,配置nameSrvAddr和groupName,就能send消息。高级点用TransactionMQProducer处理事务消息,确保消息和数据库操作一致。新手必备:TransactionListener里checkLocalTransaction返回COMMIT/ROLLBACK/UNKNOW。批量发送用ProducerBatch,性能翻倍。异步sendCallback监听结果,实时反馈。消息过滤用tags或SQL92,消费端精确匹配。延时消息setDelayTimeLevel,等级1-18对应秒到小时。顺序消息用MessageQueueSelector自定义分区,保证顺序。赶紧试试这些用法,生产环境稳了!

用法1:基础同步发送

DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes()); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);

用法2:异步发送带回调

producer.send(msg, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.printf("async success %s %n", sendResult.getMsgId()); } public void onException(Throwable e) { e.printStackTrace(); } });

用法3:单向发送(不关心结果)

producer.send(msg, MsgSendWay.ASYNC); // 最快但无确认,用在日志场景。

热议:我怎么不知道RocketMQ生产者有这么多用法?新进度揭秘

用法4:事务消息全流程

TransactionMQProducer producer = new TransactionMQProducer("group2"); producer.setTransactionListener(new TransactionListenerImpl()); producer.start(); // 执行本地事务后half消息,commit或rollback。超级适合电商下单库存扣减。

用法5:批量发送优化

List msgs = new ArrayList<>(); // 添加多个msg producer.send(msgs); // 一次网络RTT,吞吐量提升10倍!记得消息大小别超4M。

用法6:延时和顺序消息

msg.setDelayTimeLevel(3); // 10s延迟。顺序:producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List mqs, Message msg, Object arg) { int id = (Integer) arg; return mqs.get(id % mqs.size()); } }, 1);

Q: RocketMQ生产者怎么配置事务监听器?
A: 用setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 做数据库事务 return LocalTransactionState.COMMIT_MESSAGE; } public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 检查状态 return LocalTransactionState.COMMIT_MESSAGE; } });

热议:我怎么不知道RocketMQ生产者有这么多用法?新进度揭秘

Q: 批量发送有什么限制?
A: 单次批量总大小不超过1M,消息数建议1000以内,避免超时。

Q: 怎么实现消息顺序发送?
A: 指定同一个queue,用MessageQueueSelector分区key,比如订单ID hash。

Q: 生产者启动失败怎么排查?
A: 检查namesrvAddr连通,groupName唯一,端口9876默认开着。