在 Kafka 生产者调用 send 方法时,传入实现了 Callback 接口的对象,并在 onCompletion 方法中检查 Exception 参数是否为 null,这是处理异步发送失败的标准方式。
先说结论:必须实现 Callback 接口并在回调中判断异常,同时配合合理的重试配置。
- 适合:对消息可靠性有要求,不能接受静默丢失的生产环境。
- 先准备:确认 kafka-clients 依赖版本与 Broker 兼容。
- 验收:通过日志确认异常捕获逻辑被触发。
快速处理思路
核心是在代码层面实现回调接口。以下是一个包含完整上下文的最小化处理示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaProducerExample {
private static final Logger log = LoggerFactory.getLogger(KafkaProducerExample.class);
public void sendMessage(KafkaProducer<String, String> producer, String topic, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("Send failed", exception);
}
}
});
}
}同时也支持 Lambda 表达式写法(Java 8+),代码更简洁:
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Send failed", exception);
}
});这段代码确保了当发送过程中出现网络错误、Broker 不可用或序列化问题时,你的程序能感知到。
为什么会这样
Kafka 的 send 方法设计为异步非阻塞,调用后会立即返回一个 Future 对象,而实际的网络请求是由后台 I/O 线程完成的。如果不配置回调,生产者无法知道消息是否真正到达了 Broker。只有当后台线程完成请求(无论成功或失败)时,注册的 Callback 才会被调用。回调函数在 I/O 线程执行,耗时操作会阻塞发送,直接影响吞吐量。这是获取发送状态的唯一异步途径。
分步处理
1. 定义回调逻辑:创建一个类实现 org.apache.kafka.clients.producer.Callback 接口,重写 onCompletion 方法。
2. 判断异常:在方法内部,首先判断 exception 参数。如果不为 null,代表发送失败;如果为 null,代表发送成功(前提是 acks 配置允许收到响应)。
3. 配置重试策略:在 Producer 配置中设置 retries 大于 0(例如 3 或 5),并设置 retry.backoff.ms。这能让客户端在遇到临时错误时自动重试,减少回调收到异常的概率。配置示例如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// 其他配置...4. 处理失败业务:在回调的异常分支中,根据业务重要性选择日志记录、存入本地数据库待后续重发,或触发告警。不要直接在回调中无限重试发送,以免阻塞 I/O 线程。
怎么验证是否生效
1. 查看日志:运行程序后,观察日志文件中是否有你定义的异常打印信息。
2. 模拟故障:在发送消息期间,临时停止 Kafka Broker 服务或断开网络。如果配置正确,回调函数会被触发,且 exception 对象中会包含具体的错误信息(如 TimeoutException 或 NetworkException)。
3. 检查监控:如果集成了 metrics 监控,观察 record-send-rate 和 record-error-rate 指标的变化,确认错误率是否有记录。
常见坑
1. 回调中执行耗时操作:Callback 是在生产者的 I/O 线程中执行的。如果在回调中进行数据库写入或复杂计算,会阻塞消息发送,导致吞吐量下降。建议将失败消息放入内存队列,由独立线程处理。
2. acks 配置影响:如果配置 acks=0,生产者不等待 Broker 响应,此时回调仍然会被调用,但无法确认 Broker 是否真正接收成功,通常只用于允许丢失的场景。可靠性场景建议使用 acks=all 或 acks=1。
3. 重试风暴:不要在回调失败逻辑中直接再次调用 send 而不加限制。如果 Broker 持续不可用,这会形成无限重试,压垮客户端和网络。应使用指数退避或有上限的重试机制。
4. 资源未关闭:程序退出前必须调用 producer.close(),否则可能导致缓冲区中的消息未发送完成就丢失,回调也不会被执行。
参考来源
- Apache Kafka Documentation, "Producer API", https://kafka.apache.org/documentation/#producerapi
- Apache Kafka Javadoc, "Interface Callback", https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/producer/Callback.html