消费者重复消费的原因

消费者已经消费了数据但是offset没有来得及提交,再次消费还是之前的offset导致重复消费。

可能原因

  • 消费者宕机或者重启,导致已经消费没有提交offset。
  • 再平衡(rebalance),kafka采用了消费者自动提交offset,在本次还没有提交的时候,有消费者节点的移除或者加入,发生了再平衡,再次消费的时候,消费者会根据之前提交的偏移量offset消费,即之前的消费者消费了消息但是还没有提交offset,这样变会导致重复消费。

重复消费的解决方法

1) 设置消费者自动提交offset为false

spring的配置

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest
api方法实现
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
2) 在业务上对消息进行唯一判断,已经消费了的消息不再消费。

比如,每次push的时候将唯一值key记录下来,再pull消费的时候再比较这个值key。

3)实现幂等性解决kafka重复消费

一个数据或者一个请求重复执行多次,得保证数据不能出错,多次执行的效果是一样的。

比如第三方的支付接口,它会重复调用三次,当第一次我们update成功了,后面2次的请求变可以不再处理。

操作方法

  • 先根据主键查询有没有这条数据,有则insert没有则update
  • 全局唯一id,根据唯一id判断消息是否消费过了,消费了则不管,没有消费则处理,这个和第二种方法一样。

4) 新版kafka开启幂等性

在kafka 0.11.0版本后可以开启kafka的幂等性,它能保证单个partition分区上的幂等性,即单个分区可以保证不重复消费。

java api实现方法

props.put("enable.idempotence", ture)
//或者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true) 

在多个分区中,保证不重复消费,需要开启kafka的事务。

producer.initTransactions();
try {
     producer.beginTransaction();
     producer.send(record1);
     producer.send(record2);
     producer.commitTransaction();
} catch (KafkaException e) {
     producer.abortTransaction();
}

总结

使用新版kafka的配置来解决重复消费的问题,或者自己实现幂等性解决kafka中的重复消费。