Skip to content

Consumer.consume commits offsets when 'enable.auto.commit': False #1299

Open
@whobscr

Description

@whobscr

Description

When I disable auto commit with 'enable.auto.commit': False , and call poll(), the consumer doesn't commit and it's working fine, like it's said in the docs. But on consume() it commits offsets after every call. This is quite unexpected.
Consumer.close() is not called during execution.

How to reproduce

   consumer = Consumer({'bootstrap.servers': 'server:9092',
         'group.id': 'group_id',
         'enable.auto.commit': False,
         'auto.offset.reset': 'earliest'})
    print(str(consumer.get_watermark_offsets(TopicPartition('topic_test', 0))))

   try:
        consumer.subscribe([config.KAFKA_INPUT_TOPIC])

        while True:
            messages = consumer.consume(num_messages=config.KAFKA_MAX_BATCH_SIZE, timeout=10.0)
            if messages:
                for msg in messages:
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            logger.error('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                        else:
                            raise KafkaException(msg.error())
                    else:
                        print('message ready')

             print(str(consumer.get_watermark_offsets(TopicPartition('topic_test', 0))))
            #consumer.commit(asynchronous=False)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

Execution result: first call to get_watermark_offsets is different from all subsequent calls. I expect it to be the same. I need to get a batch of messages to process them concurrently, but seems like this batch consumpsion is committing the offsets before the processing which may result in loss of data.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 1.8.2
  • Apache Kafka broker version: 2.11
  • Operating system: CentOS Linux release 7.9.2009

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionA question about how to use or about expected behavior of the library

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions