Skip to content

Consumer.get_watermark_offsets Ignores Timeout Parameter in Some Situations #413

Open
@bpowers39

Description

@bpowers39

Description

The get_watermark_offsets ignores the timeout and blocks forever when the kafka broker for a selected partition is down. Once the broker is back up, the function returns. I suspect this is a bug in rdkafka, but I'm posting it here first in case it's an issue with the python bindings.

How to reproduce

Create a topic with one partition. Run the below example. Once the example has consumed a few messages, kill the broker hosting the partition. The call to get_watermark_offsets will block until the broker comes back up.

import time
import sys
import confluent_kafka
from confluent_kafka import Consumer, KafkaError
from uuid import uuid4

if __name__ == '__main__':
    debug_thread = threading.Thread(target=debug_thread_func)
    debug_thread.start()

    client = Consumer({'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()),
              'default.topic.config': {'auto.offset.reset': 'smallest'}})

    def assigned(consumer, partitions):
        print("Assigned:", partitions)
        

    client.subscribe(['ibbot'], on_assign=assigned)

    while True:
        msg = client.poll(timeout=1)
        for partition in client.assignment():
            print(client.get_watermark_offsets(partition, timeout=1))

        if msg is not None:
            if msg.error():
                print("Error: ", msg.error())
            else:
                print("Data")

    client.close()
   

Note that this example is probably dependent on a bug in rdkafka, so it may not be reproducible 100% of the time. You can also reproduce this with a multi-partition and broker setup. In this case, the function only blocks for as long as it takes for a new leader to be elected.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): kafka-python: ('0.11.4', 721920) rdkafka: ('0.11.4', 722175)
  • Apache Kafka broker version: 0.11.0.2
  • Client configuration: {'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()), 'default.topic.config': {'auto.offset.reset': 'smallest'}}
  • Operating system: RHEL 7
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    component:librdkafkaFor issues tied to the librdkfka elementsinvestigate furtherIt's unclear what the issue is at this time but there is enough interest to look into it

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions