Open
Description
Description
We are running trying to write to kafka from a spark job. We are using confluent-kafka
to write the data.
We get data as RDD and when we try to do: partition.flush()
it throws exception saying: ImportError: No module named 'cimpl'
The contents of confluent-kafka
that we have installed are:
/usr/local/lib/python3.5/dist-packages/confluent_kafka# ls
__init__.py __pycache__ admin avro cimpl.cpython-35m-x86_64-linux-gnu.so kafkatest
How to reproduce
pip3 install confluent-kafka
I tried doing these as well:
pip3 install confluent_kafka
pip3 install confluent-kafka==0.11.6
def write_rdd(self, rdd, topic):
def send_to_kafka(partition):
p = Producer({
'bootstrap.servers': self.brokers
})
i = 0
for data in partition:
p.produce(topic, data)
if i % 100 == 0:
p.poll(0)
if i % 1000 == 0:
p.flush() // Here is where it throws error
i = i + 1
p.flush()
rdd.foreachPartition(send_to_kafka)
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
0.11.0.0
- Client configuration:
{...}
- Operating system:
Debian 9.9 stretch
- Provide client logs (with
'debug': '..'
as necessary)
<module 'src.jobs.kafka_writer' from '/tmp/write_customer_summary_to_kafka_gofood_92470344/jobs.zip/src/jobs/kafka_writer/__init__.py'>
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 148, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.5/pickle.py", line 408, in dump
self.save(obj)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 744, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
save(x)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 801, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 774, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 801, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 297, in save_function_tuple
save(f_globals)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.5/pickle.py", line 845, in _batch_setitems
save(v)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 391, in save_global
__import__(modname)
ImportError: No module named 'cimpl'
- Provide broker log excerpts
- Critical issue