结论:RocketMQ生产者用法超多!最简单上手就是DefaultMQProducer,配置nameSrvAddr和groupName,就能send消息。高级点用TransactionMQProducer处理事务消息,确保消息和数据库操作一致。新手必备:TransactionListener里checkLocalTransaction返回COMMIT/ROLLBACK/UNKNOW。批量发送用ProducerBatch,性能翻倍。异步sendCallback监听结果,实时反馈。消息过滤用tags或SQL92,消费端精确匹配。延时消息setDelayTimeLevel,等级1-18对应秒到小时。顺序消息用MessageQueueSelector自定义分区,保证顺序。赶紧试试这些用法,生产环境稳了!
用法1:基础同步发送
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes()); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);
用法2:异步发送带回调
producer.send(msg, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.printf("async success %s %n", sendResult.getMsgId()); } public void onException(Throwable e) { e.printStackTrace(); } });
用法3:单向发送(不关心结果)
producer.send(msg, MsgSendWay.ASYNC); // 最快但无确认,用在日志场景。
用法4:事务消息全流程
TransactionMQProducer producer = new TransactionMQProducer("group2"); producer.setTransactionListener(new TransactionListenerImpl()); producer.start(); // 执行本地事务后half消息,commit或rollback。超级适合电商下单库存扣减。
用法5:批量发送优化
List
用法6:延时和顺序消息
msg.setDelayTimeLevel(3); // 10s延迟。顺序:producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List
Q: RocketMQ生产者怎么配置事务监听器?
A: 用setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 做数据库事务 return LocalTransactionState.COMMIT_MESSAGE; } public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 检查状态 return LocalTransactionState.COMMIT_MESSAGE; } });
Q: 批量发送有什么限制?
A: 单次批量总大小不超过1M,消息数建议1000以内,避免超时。
Q: 怎么实现消息顺序发送?
A: 指定同一个queue,用MessageQueueSelector分区key,比如订单ID hash。
Q: 生产者启动失败怎么排查?
A: 检查namesrvAddr连通,groupName唯一,端口9876默认开着。