Skip to content

BufferError: Local: Queue Full #781

Closed
@luanmorenomaciel

Description

@luanmorenomaciel

Description

Error trying to send data using the Python library to Confluent 5.4. All the services are up and running, and even trying producer config options I'm getting a constant buffer pool exception.

I've been through the other links and recommendations but still getting the errors, is there any way this could be failing due the advertised listener?

How to reproduce

def avro_producer(self):

    # avro schema [key] & [value]
    key_schema_str = config.key_schema_str
    value_schema_str = config.value_schema_str

    # load avro definition
    key_schema = avro.loads(key_schema_str)
    value_schema = avro.loads(value_schema_str)

    # get data to insert
    get_data = read_files.CSV().csv_reader()

    # init producer using key & value schema
    producer = AvroProducer(
        {
            # kafka broker server
            "bootstrap.servers": config.bootstrap_servers,
            # schema registry url
            "schema.registry.url": config.schema_registry_url,
            # max number of messages batched in one message set
            "batch.num.messages": 100,
            # delay in ms to wait for messages in queue
            "queue.buffering.max.ms": 100,
            # max number of messages on queue
            "queue.buffering.max.messages": 1000,
            # wait messages in queue before send to brokers (batch)
            "linger.ms": 200
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema)

    # loop to insert data
    inserts = 0
    while inserts < len(get_data):

        # instantiate new records, execute callbacks
        record = Kafka()

        try:

            # map columns and access using dict values
            record.genre = get_data[inserts]['genre']
            record.artist_name = get_data[inserts]['artist_name']
            record.track_name = get_data[inserts]['track_name']
            record.track_id = get_data[inserts]['track_id']
            record.popularity = get_data[inserts]['popularity']
            record.duration_ms = get_data[inserts]['duration_ms']

            # server on_delivery callbacks from previous asynchronous produce()
            producer.poll(0)

            # message passed to the delivery callback will already be serialized.
            # to aid in debugging we provide the original object to the delivery callback.
            producer.produce(
                topic=config.topic,
                key={'user_id': randint(0, 100000)},
                value=record.to_dict(),
                callback=lambda err, msg, obj=record: self.on_delivery(err, msg, obj)
            )
        except BufferError:
            print("buffer full")
            raise

        except ValueError:
            print("invalid input")
            raise

        except KeyboardInterrupt:
            raise

        # increment values
        inserts += 1

    print("flushing records...")

    # buffer messages to send
    producer.flush()

Checklist

Please provide the following information:

  • confluent-kafka[avro]==1.3.0

  • confluent-kafka==1.3.0

              "bootstrap.servers": config.bootstrap_servers,
              "schema.registry.url": config.schema_registry_url,
              "batch.num.messages": 100,
              "queue.buffering.max.ms": 100,
              "queue.buffering.max.messages": 1000,
              "linger.ms": 200
    

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionA question about how to use or about expected behavior of the library

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions