消费者重复消费的原因
消费者已经消费了数据但是offset没有来得及提交,再次消费还是之前的offset导致重复消费。
可能原因
- 消费者宕机或者重启,导致已经消费没有提交offset。
- 再平衡(rebalance),kafka采用了消费者自动提交offset,在本次还没有提交的时候,有消费者节点的移除或者加入,发生了再平衡,再次消费的时候,消费者会根据之前提交的偏移量offset消费,即之前的消费者消费了消息但是还没有提交offset,这样变会导致重复消费。
重复消费的解决方法
1) 设置消费者自动提交offset为false
spring的配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest
api方法实现
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
2) 在业务上对消息进行唯一判断,已经消费了的消息不再消费。比如,每次push的时候将唯一值key记录下来,再pull消费的时候再比较这个值key。
3)实现幂等性解决kafka重复消费
一个数据或者一个请求重复执行多次,得保证数据不能出错,多次执行的效果是一样的。
比如第三方的支付接口,它会重复调用三次,当第一次我们update成功了,后面2次的请求变可以不再处理。
操作方法
- 先根据主键查询有没有这条数据,有则insert没有则update
- 全局唯一id,根据唯一id判断消息是否消费过了,消费了则不管,没有消费则处理,这个和第二种方法一样。
4) 新版kafka开启幂等性
在kafka 0.11.0版本后可以开启kafka的幂等性,它能保证单个partition分区上的幂等性,即单个分区可以保证不重复消费。
java api实现方法
props.put("enable.idempotence", ture)
//或者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
在多个分区中,保证不重复消费,需要开启kafka的事务。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
总结
使用新版kafka的配置来解决重复消费的问题,或者自己实现幂等性解决kafka中的重复消费。