Closed
Description
Description
Error trying to send data using the Python library to Confluent 5.4. All the services are up and running, and even trying producer config options I'm getting a constant buffer pool exception.
I've been through the other links and recommendations but still getting the errors, is there any way this could be failing due the advertised listener?
How to reproduce
def avro_producer(self):
# avro schema [key] & [value]
key_schema_str = config.key_schema_str
value_schema_str = config.value_schema_str
# load avro definition
key_schema = avro.loads(key_schema_str)
value_schema = avro.loads(value_schema_str)
# get data to insert
get_data = read_files.CSV().csv_reader()
# init producer using key & value schema
producer = AvroProducer(
{
# kafka broker server
"bootstrap.servers": config.bootstrap_servers,
# schema registry url
"schema.registry.url": config.schema_registry_url,
# max number of messages batched in one message set
"batch.num.messages": 100,
# delay in ms to wait for messages in queue
"queue.buffering.max.ms": 100,
# max number of messages on queue
"queue.buffering.max.messages": 1000,
# wait messages in queue before send to brokers (batch)
"linger.ms": 200
},
default_key_schema=key_schema,
default_value_schema=value_schema)
# loop to insert data
inserts = 0
while inserts < len(get_data):
# instantiate new records, execute callbacks
record = Kafka()
try:
# map columns and access using dict values
record.genre = get_data[inserts]['genre']
record.artist_name = get_data[inserts]['artist_name']
record.track_name = get_data[inserts]['track_name']
record.track_id = get_data[inserts]['track_id']
record.popularity = get_data[inserts]['popularity']
record.duration_ms = get_data[inserts]['duration_ms']
# server on_delivery callbacks from previous asynchronous produce()
producer.poll(0)
# message passed to the delivery callback will already be serialized.
# to aid in debugging we provide the original object to the delivery callback.
producer.produce(
topic=config.topic,
key={'user_id': randint(0, 100000)},
value=record.to_dict(),
callback=lambda err, msg, obj=record: self.on_delivery(err, msg, obj)
)
except BufferError:
print("buffer full")
raise
except ValueError:
print("invalid input")
raise
except KeyboardInterrupt:
raise
# increment values
inserts += 1
print("flushing records...")
# buffer messages to send
producer.flush()
Checklist
Please provide the following information:
-
confluent-kafka[avro]==1.3.0
-
confluent-kafka==1.3.0
"bootstrap.servers": config.bootstrap_servers, "schema.registry.url": config.schema_registry_url, "batch.num.messages": 100, "queue.buffering.max.ms": 100, "queue.buffering.max.messages": 1000, "linger.ms": 200