kafka 自动提交 offset 失败:Auto offset commit failed




problem

今天在服务日志中观察数据的消费情况时,发现了一个如下的错误,而且每隔几秒就会出现一次,

[2020-12-30 14:33:13,996 consumer.py 539 ERROR 84613] Offset commit failed: This is likely to cause duplicate message delivery
Traceback (most recent call last):
  File "/home/kyfq/anaconda3/envs/algo-work/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 528, in _maybe_auto_commit_offsets_sync
    self.commit_offsets_sync(self._subscription.all_consumed_offsets())
  File "/home/kyfq/anaconda3/envs/algo-work/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 521, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

不难看出是自动提交 offset 失败了,我们都知道,kafka 的数据更新消费都是通过在 zookeeper 中标记一个偏移量(offset)来记录每个分区的消费位置,所以一旦 offset 更新失败,不难想象肯定会出现重复消费数据的问题!

通过以上信息分析大概意思是:kafka 消费者在处理消息时,在指定时间内(session.time.out)没有处理完,consumer coordinator 会由于没有接受到心跳而挂掉,导致自动提交 offset 失败,因此就会像日志中所说的发生 rebalanced(重平衡即重新分配 partition 给客户端),而之前提交的 offset 已经失败了,所以重新分配的客户端又会消费之前的数据,接着 consumer 重新消费,又出现了消费超时,无限循环下去。

solution

将 enable.auto.commit 设置成 false,即不采用自动提交方式;

由于使用了 spring-kafka,禁止 kafka-client 自动提交 offset,因为就是之前的自动提交失败,导致 offset 永远没更新,从而转向使用 spring-kafka 的 offset 提交机制。


  • 1)如果 auto.commit 关掉的话,spring-kafka 会启动一个 invoker,这个 invoker 的目的就是启动一个线程去消费数据,他消费的数据不是直接从 kafka 里面直接取的,那么他消费的数据从哪里来呢?他是从一个 spring-kafka 自己创建的阻塞队列里面取的。
  • 2)然后会进入一个循环,从源代码中可以看到如果 auto.commit 被关掉的话, 他会先把之前处理过的数据先进行提交 offset,然后再去从 kafka 里面取数据。
  • 3)然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka 会调用 kafka 的 pause 方法,则 consumer 会停止从 kafka 里面继续再拿数据。
  • 4)接着 spring-kafka 还会处理一些异常的情况,比如失败之后是不是需要 commit offset 这样的逻辑



reference