要保障Kafka消息不重复,选择幂等生产者是最简单有效的方式。只需在生产者配置中设置enable.idempotence=true,就能自动开启幂等性,无需额外代码。Kafka会为每个生产者分配Producer ID (PID)和序列号,确保相同消息不会重复写入。遇到网络抖动或重试时,Broker会识别重复并忽略。
来自CSDN文章片段
Kafka幂等生产者的核心是通过引入生产者ID(Producer ID,简称PID)和序列号(Sequence Number,简称SeqNumber)来实现消息不重复。PID是Broker分配给生产者的唯一标识,SeqNumber是针对每个分区自增的序列号。当生产者发送消息时,会携带PID和SeqNumber,Broker收到后会检查是否已存在相同PID和SeqNumber的消息,如果存在则丢弃,否则追加写入。这样就完美解决了生产者重试导致的消息重复问题。
来自博客园文章片段
开启幂等生产者非常简单,只需要在Producer配置文件中设置以下参数: props.put("enable.idempotence", "true"); 同时需要设置acks=all和retries>0,Kafka会自动处理其他依赖参数如max.in.flight.requests.per.connection=5。启用后,生产者内部会维护一个序列号生成器,对于同一个分区,序列号是递增的,即使重试也不会重复。
来自知乎专栏片段
为什么选择幂等生产者?因为它只解决生产者端的重复,不需要修改消费者代码,也无需Exactly-Once语义的复杂配置。在0.11版本引入后,成为推荐的at-least-once保障方式。测试时可以用同一个topic多次发送相同消息,观察offset不会重复递增。
来自阿里云开发者社区片段
幂等性的实现依赖于Broker端的TransactionStateLog,它记录了每个PID的最新序列号。当收到消息时,Broker会验证序列号是否为预期值(上一个+1),如果是则append,否则reject。PID有效期为24小时,过期后重新分配。
来自腾讯云开发者社区片段
注意事项:幂等生产者不支持事务生产者混合使用,一个PID只能用于幂等或事务,不能两者兼得。所有分区共享同一个deferred队列,重试时会等待所有in-flight请求完成,以保持序列号顺序。
来自掘金文章片段
实际代码示例: Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); KafkaProducer<String, String> producer = new KafkaProducer<>(props);
FAQ
Q: 幂等生产者会影响性能吗?
A: 轻微影响,主要因为acks=all和限流in-flight requests,但远低于启用事务的开销。
Q: PID过期后会丢失幂等性吗?
A: 是的,24小时后PID失效,需重启生产者或等待新PID分配,可能导致短暂重复。
Q: 它能防止消费者重复消费吗?
A: 不能,只保障生产端不重复,消费者仍需自己处理幂等。
Q: 支持多线程生产者吗?
A: 不推荐,序列号维护依赖单线程顺序,建议一个线程一个生产者实例。