From 3e3f17ae3e7037e847fd244df54cda3e20cb366e Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 3 Jan 2025 11:35:51 -0800 Subject: [PATCH 1/6] Ensure algorithm query param is passed for CSFLE (#1889) * Add missing algorithm query param * Add test --- .../rules/cel/cel_field_executor.py | 2 +- .../dek_registry/dek_registry_client.py | 2 +- tests/schema_registry/test_avro_serdes.py | 68 +++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/confluent_kafka/schema_registry/rules/cel/cel_field_executor.py b/src/confluent_kafka/schema_registry/rules/cel/cel_field_executor.py index 704afcc9b..b040d13ed 100644 --- a/src/confluent_kafka/schema_registry/rules/cel/cel_field_executor.py +++ b/src/confluent_kafka/schema_registry/rules/cel/cel_field_executor.py @@ -45,7 +45,7 @@ def _field_transform(self, ctx: RuleContext, field_ctx: FieldContext, field_valu "name": field_ctx.name, "typeName": field_ctx.type_name(), "tags": [celtypes.StringType(tag) for tag in field_ctx.tags], - "message": msg_to_cel(field_value), + "message": msg_to_cel(field_ctx.containing_message), } return self._executor.execute(ctx, field_value, args) diff --git a/src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py b/src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py index 593cff151..8f99af4fd 100644 --- a/src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py +++ b/src/confluent_kafka/schema_registry/rules/encryption/dek_registry/dek_registry_client.py @@ -632,7 +632,7 @@ def get_dek( if dek is not None: return dek - query = {'deleted': deleted} + query = {'algorithm': algorithm, 'deleted': deleted} response = self._rest_client.get('/dek-registry/v1/keks/{}/deks/{}/versions/{}' .format(urllib.parse.quote(kek_name), urllib.parse.quote(subject, safe=''), diff --git a/tests/schema_registry/test_avro_serdes.py b/tests/schema_registry/test_avro_serdes.py index 9f039feaf..ce7c8839b 100644 --- a/tests/schema_registry/test_avro_serdes.py +++ b/tests/schema_registry/test_avro_serdes.py @@ -929,6 +929,74 @@ def test_avro_encryption(): assert obj == obj2 +def test_avro_encryption_deterministic(): + executor = FieldEncryptionExecutor.register_with_clock(FakeClock()) + + conf = {'url': _BASE_URL} + client = SchemaRegistryClient.new_client(conf) + ser_conf = {'auto.register.schemas': False, 'use.latest.version': True} + rule_conf = {'secret': 'mysecret'} + schema = { + 'type': 'record', + 'name': 'test', + 'fields': [ + {'name': 'intField', 'type': 'int'}, + {'name': 'doubleField', 'type': 'double'}, + {'name': 'stringField', 'type': 'string', 'confluent:tags': ['PII']}, + {'name': 'booleanField', 'type': 'boolean'}, + {'name': 'bytesField', 'type': 'bytes', 'confluent:tags': ['PII']}, + ] + } + + rule = Rule( + "test-encrypt", + "", + RuleKind.TRANSFORM, + RuleMode.WRITEREAD, + "ENCRYPT", + ["PII"], + RuleParams({ + "encrypt.kek.name": "kek1", + "encrypt.kms.type": "local-kms", + "encrypt.kms.key.id": "mykey", + "encrypt.dek.algorithm": "AES256_SIV" + }), + None, + None, + "ERROR,NONE", + False + ) + client.register_schema(_SUBJECT, Schema( + json.dumps(schema), + "AVRO", + [], + None, + RuleSet(None, [rule]) + )) + + obj = { + 'intField': 123, + 'doubleField': 45.67, + 'stringField': 'hi', + 'booleanField': True, + 'bytesField': b'foobar', + } + ser = AvroSerializer(client, schema_str=None, conf=ser_conf, rule_conf=rule_conf) + dek_client = executor.client + ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE) + obj_bytes = ser(obj, ser_ctx) + + # reset encrypted fields + assert obj['stringField'] != 'hi' + obj['stringField'] = 'hi' + obj['bytesField'] = b'foobar' + + deser = AvroDeserializer(client, rule_conf=rule_conf) + executor.client = dek_client + obj2 = deser(obj_bytes, ser_ctx) + assert obj == obj2 + + def test_avro_encryption_cel(): executor = FieldEncryptionExecutor.register_with_clock(FakeClock()) From 1cf37dee73ecf0f53d43c16cd72b0c1834a06bae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20=22RooTer=22=20Urba=C5=84ski?= Date: Fri, 3 Jan 2025 20:39:32 +0100 Subject: [PATCH 2/6] remove httpx constraint as new respx versions are already fixed (#1888) Co-authored-by: Robert Yokota --- requirements/requirements-examples.txt | 3 +-- requirements/requirements-schemaregistry.txt | 3 +-- requirements/requirements-tests.txt | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/requirements/requirements-examples.txt b/requirements/requirements-examples.txt index d5f977ac9..35ce13516 100644 --- a/requirements/requirements-examples.txt +++ b/requirements/requirements-examples.txt @@ -6,8 +6,7 @@ six attrs cachetools -# Temporary constraint due to https://github.com/lundberg/respx/issues/277 -httpx<0.28.0 +httpx fastavro < 1.8.0; python_version == "3.7" fastavro < 2; python_version > "3.7" diff --git a/requirements/requirements-schemaregistry.txt b/requirements/requirements-schemaregistry.txt index 2762fc3ce..1085c4992 100644 --- a/requirements/requirements-schemaregistry.txt +++ b/requirements/requirements-schemaregistry.txt @@ -1,4 +1,3 @@ attrs cachetools -# Temporary constraint due to https://github.com/lundberg/respx/issues/277 -httpx<0.28.0 \ No newline at end of file +httpx diff --git a/requirements/requirements-tests.txt b/requirements/requirements-tests.txt index 55d3cdfa4..92cc08b37 100644 --- a/requirements/requirements-tests.txt +++ b/requirements/requirements-tests.txt @@ -5,4 +5,4 @@ flake8 pytest pytest-timeout requests-mock -respx \ No newline at end of file +respx From e91dc5753eccdcefdebbcb8937d87f932f760571 Mon Sep 17 00:00:00 2001 From: Kunal Gupta Date: Mon, 6 Jan 2025 15:04:46 -0600 Subject: [PATCH 3/6] DGS-16859: Enable SonarQube code coverage reporting (#1887) --- .gitignore | 3 +++ .semaphore/semaphore.yml | 29 +++++++++++++++++++++++++++++ requirements/requirements-tests.txt | 1 + service.yml | 6 ++++++ sonar-project.properties | 2 +- tests/README.md | 7 +++++++ tox.ini | 16 +++++++++++++--- 7 files changed, 60 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 71729ca5c..edf60b535 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,6 @@ tmp-KafkaCluster venv_test venv_examples *Zone.Identifier +.coverage +**/coverage.xml +**/test-report.xml diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index bb8689f89..4f7c5c33c 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -14,6 +14,22 @@ global_job_config: - checkout - mkdir artifacts blocks: + - name: Test + dependencies: [] + run: + # don't run the tests on non-functional changes... + when: "change_in('/', {exclude: ['.github/']})" + task: + jobs: + - name: Test + commands: + - sem-version python 3.9 + - pip install tox + - tox -e cover + - mkdir test-output + - cp test-report.xml test-output + - test-results publish test-output + - artifact push workflow coverage.xml - name: "Wheels: OSX x64 - Python 3.6-3.12" run: when: "tag =~ '.*'" @@ -285,3 +301,16 @@ blocks: - cd .. - artifact push project artifacts/confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz --destination confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz - echo Thank you + +after_pipeline: + task: + agent: + machine: + type: s1-prod-ubuntu20-04-amd64-1 + jobs: + - name: SonarQube + commands: + - checkout + - sem-version java 11 + - artifact pull workflow coverage.xml + - emit-sonarqube-data --run_only_sonar_scan diff --git a/requirements/requirements-tests.txt b/requirements/requirements-tests.txt index 92cc08b37..ecb3d68d2 100644 --- a/requirements/requirements-tests.txt +++ b/requirements/requirements-tests.txt @@ -6,3 +6,4 @@ pytest pytest-timeout requests-mock respx +pytest_cov diff --git a/service.yml b/service.yml index 971ef7710..b792fb913 100644 --- a/service.yml +++ b/service.yml @@ -6,6 +6,12 @@ git: github: enable: true repo_name: confluentinc/confluent-kafka-python +sonarqube: + enable: true + coverage_exclusions: + - "**/*.pb.*" + - "**/mk-include/**/*" + - "examples/**" semaphore: enable: true pipeline_enable: false diff --git a/sonar-project.properties b/sonar-project.properties index dd3df6052..77fe6adba 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -1,7 +1,7 @@ ### service-bot sonarqube plugin managed file sonar.coverage.exclusions=**/test/**/*,**/tests/**/*,**/mock/**/*,**/mocks/**/*,**/*mock*,**/*test* sonar.cpd.exclusions=**/test/**/*,**/tests/**/*,**/mock/**/*,**/mocks/**/*,**/*mock*,**/*test* -sonar.exclusions=**/*.pb.*,**/mk-include/**/* +sonar.exclusions=**/*.pb.*,**/mk-include/**/*,examples/** sonar.language=python sonar.projectKey=confluent-kafka-python sonar.python.coverage.reportPaths=**/coverage.xml diff --git a/tests/README.md b/tests/README.md index 1f1f46ced..f3d436193 100644 --- a/tests/README.md +++ b/tests/README.md @@ -123,3 +123,10 @@ Tox can be used to test against various supported Python versions (py27, py36, p ```$ ./tests/run.sh tox``` + +### Running with test coverage +From top-level directory: +``` +pip install tox +./tests/run.sh tox -e cover +``` diff --git a/tox.ini b/tox.ini index 55c786e54..7b385b286 100644 --- a/tox.ini +++ b/tox.ini @@ -2,13 +2,13 @@ envlist = flake8,py37,py38,py39,py310,py311,py312,py313 [testenv] +deps = + -r requirements/requirements-tests-install.txt + . passenv = #http://tox.readthedocs.io/en/latest/config.html#confval-passenv=SPACE-SEPARATED-GLOBNAMES * commands = - # Install main package and all sub-packages - pip install -r requirements/requirements-tests-install.txt - pip install . # Early verification that module is loadable python -c 'import confluent_kafka ; print(confluent_kafka.version())' # Run tests (large timeout to allow docker image downloads) @@ -20,6 +20,16 @@ commands = deps = flake8 commands = flake8 +[testenv:cover] +commands = + # Install source with editable flag (-e) so that SonarQube can parse the coverage report. + # Otherwise, the report shows source files located in site-packages, which SonarQube cannot find. + # Example: ".tox/cover/lib/python3.11/site-packages/confluent_kafka/__init__.py" + # instead of "src/confluent_kafka/__init__.py" + pip install -e . + pytest {env:PYTESTARGS:} --cov confluent_kafka --cov-report term --cov-report html --cov-report xml \ + --cov-branch --junitxml=test-report.xml tests/ {posargs} + [pytest] python_files = test_* testpaths = tests From 6c585347b7d31b094b7d3b8dbdc5efb9b7fbe2ea Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 6 Jan 2025 16:48:41 -0800 Subject: [PATCH 4/6] DGS-19492 Handle records nested in arrays/maps when searching for tags (#1890) * Handle records nested in arrays/maps when searching for tags * Fix formatting --- src/confluent_kafka/schema_registry/avro.py | 6 +- tests/schema_registry/test_avro_serdes.py | 75 +++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 7a4a48319..6c800f977 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -752,7 +752,11 @@ def _get_inline_tags_recursively( return else: schema_type = schema.get("type") - if schema_type == 'record': + if schema_type == 'array': + _get_inline_tags_recursively(ns, name, schema.get("items"), tags) + elif schema_type == 'map': + _get_inline_tags_recursively(ns, name, schema.get("values"), tags) + elif schema_type == 'record': record_ns = schema.get("namespace") record_name = schema.get("name") if record_ns is None: diff --git a/tests/schema_registry/test_avro_serdes.py b/tests/schema_registry/test_avro_serdes.py index ce7c8839b..aeb58d984 100644 --- a/tests/schema_registry/test_avro_serdes.py +++ b/tests/schema_registry/test_avro_serdes.py @@ -757,6 +757,81 @@ def test_avro_cel_field_transform_complex_with_none(): assert obj2 == newobj +def test_avro_cel_field_transform_complex_nested(): + conf = {'url': _BASE_URL} + client = SchemaRegistryClient.new_client(conf) + ser_conf = {'auto.register.schemas': False, 'use.latest.version': True} + schema = { + 'type': 'record', + 'name': 'UnionTest', + 'namespace': 'test', + 'fields': [ + { + 'name': 'emails', + 'type': [ + 'null', + { + 'type': 'array', + 'items': { + 'type': 'record', + 'name': 'Email', + 'fields': [ + { + 'name': 'email', + 'type': [ + 'null', + 'string' + ], + 'doc': 'Email address', + 'confluent:tags': [ + 'PII' + ] + } + ] + } + } + ], + 'doc': 'Communication Email', + } + ] + } + + rule = Rule( + "test-cel", + "", + RuleKind.TRANSFORM, + RuleMode.WRITE, + "CEL_FIELD", + None, + None, + "typeName == 'STRING' ; value + '-suffix'", + None, + None, + False + ) + client.register_schema(_SUBJECT, Schema( + json.dumps(schema), + "AVRO", + [], + None, + RuleSet(None, [rule]) + )) + + obj = { + 'emails': [{'email': 'john@acme.com'}] + } + ser = AvroSerializer(client, schema_str=None, conf=ser_conf) + ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE) + obj_bytes = ser(obj, ser_ctx) + + obj2 = { + 'emails': [{'email': 'john@acme.com-suffix'}] + } + deser = AvroDeserializer(client) + newobj = deser(obj_bytes, ser_ctx) + assert obj2 == newobj + + def test_avro_cel_field_condition(): conf = {'url': _BASE_URL} client = SchemaRegistryClient.new_client(conf) From e36d719c177d9d839474352941ba2180e2c14ad9 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 7 Jan 2025 23:50:25 +0530 Subject: [PATCH 5/6] Changes for v2.8.0 release (#1891) --- .semaphore/semaphore.yml | 58 +++++++++++------------ CHANGELOG.md | 10 ++++ MANIFEST.in | 1 + docs/conf.py | 2 +- examples/docker/Dockerfile.alpine | 2 +- pyproject.toml | 3 +- src/confluent_kafka/src/confluent_kafka.h | 14 +++--- 7 files changed, 50 insertions(+), 40 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 4f7c5c33c..00c34cf85 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -8,28 +8,28 @@ execution_time_limit: global_job_config: env_vars: - name: LIBRDKAFKA_VERSION - value: v2.6.1 + value: v2.8.0 prologue: commands: - checkout - mkdir artifacts blocks: - - name: Test - dependencies: [] - run: - # don't run the tests on non-functional changes... - when: "change_in('/', {exclude: ['.github/']})" - task: - jobs: - - name: Test - commands: - - sem-version python 3.9 - - pip install tox - - tox -e cover - - mkdir test-output - - cp test-report.xml test-output - - test-results publish test-output - - artifact push workflow coverage.xml + # - name: Test + # dependencies: [] + # run: + # # don't run the tests on non-functional changes... + # when: "change_in('/', {exclude: ['.github/']})" + # task: + # jobs: + # - name: Test + # commands: + # - sem-version python 3.9 + # - pip install tox + # - tox -e cover + # - mkdir test-output + # - cp test-report.xml test-output + # - test-results publish test-output + # - artifact push workflow coverage.xml - name: "Wheels: OSX x64 - Python 3.6-3.12" run: when: "tag =~ '.*'" @@ -302,15 +302,15 @@ blocks: - artifact push project artifacts/confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz --destination confluent-kafka-python-wheels-${SEMAPHORE_GIT_TAG_NAME}-${SEMAPHORE_WORKFLOW_ID}.tgz - echo Thank you -after_pipeline: - task: - agent: - machine: - type: s1-prod-ubuntu20-04-amd64-1 - jobs: - - name: SonarQube - commands: - - checkout - - sem-version java 11 - - artifact pull workflow coverage.xml - - emit-sonarqube-data --run_only_sonar_scan +# after_pipeline: +# task: +# agent: +# machine: +# type: s1-prod-ubuntu20-04-amd64-1 +# jobs: +# - name: SonarQube +# commands: +# - checkout +# - sem-version java 11 +# - artifact pull workflow coverage.xml +# - emit-sonarqube-data --run_only_sonar_scan diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ea767353..a7ff292d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Confluent's Python client for Apache Kafka +## v2.8.0 + +v2.8.0 is a feature release with the features, fixes and enhancements: + +confluent-kafka-python v2.8.0 is based on librdkafka v2.8.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.8.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. + + ## v2.7.0 v2.7.0 is a feature release with the features, fixes and enhancements present in v2.6.2 including the following fix: @@ -10,6 +19,7 @@ confluent-kafka-python v2.7.0 is based on librdkafka v2.6.1, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.1) for a complete list of changes, enhancements, fixes and upgrade considerations. + ## v2.6.2 > [!WARNING] diff --git a/MANIFEST.in b/MANIFEST.in index 2f7a4818d..7e9bbf313 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ include README.md +include LICENSE include src/confluent_kafka/src/*.[ch] prune tests prune docs \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index 86bf02e0f..7dc3592fa 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -27,7 +27,7 @@ # built documents. # # The short X.Y version. -version = '2.7.0' +version = '2.8.0' # The full version, including alpha/beta/rc tags. release = version ###################################################################### diff --git a/examples/docker/Dockerfile.alpine b/examples/docker/Dockerfile.alpine index a3d421201..26c006bbd 100644 --- a/examples/docker/Dockerfile.alpine +++ b/examples/docker/Dockerfile.alpine @@ -30,7 +30,7 @@ FROM alpine:3.12 COPY . /usr/src/confluent-kafka-python -ENV LIBRDKAFKA_VERSION v2.6.1 +ENV LIBRDKAFKA_VERSION v2.8.0 ENV KAFKACAT_VERSION master diff --git a/pyproject.toml b/pyproject.toml index d18bf7e37..a4427bf95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "confluent-kafka" -version = "2.7.0" +version = "2.8.0" description = "Confluent's Python client for Apache Kafka" classifiers = [ "Development Status :: 5 - Production/Stable", @@ -14,7 +14,6 @@ classifiers = [ "Programming Language :: Python :: 3", "Topic :: Software Development :: Libraries :: Python Modules"] readme = "README.md" -license = { file = "LICENSE" } requires-python = ">=3.7" dynamic = ["dependencies", "optional-dependencies"] diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index a6c4162d5..5acbf8ed3 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -42,8 +42,8 @@ * 0xMMmmRRPP * MM=major, mm=minor, RR=revision, PP=patchlevel (not used) */ -#define CFL_VERSION 0x02070200 -#define CFL_VERSION_STR "2.7.0" +#define CFL_VERSION 0x02080000 +#define CFL_VERSION_STR "2.8.0" /** * Minimum required librdkafka version. This is checked both during @@ -51,19 +51,19 @@ * Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error * defines and strings in sync. */ -#define MIN_RD_KAFKA_VERSION 0x020601ff +#define MIN_RD_KAFKA_VERSION 0x020800ff #ifdef __APPLE__ -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.6.1 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.6.1 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION #ifdef __APPLE__ -#error "confluent-kafka-python requires librdkafka v2.6.1 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#error "confluent-kafka-python requires librdkafka v2.6.1 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#error "confluent-kafka-python requires librdkafka v2.8.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #endif From 92c83e76d59e02171df86b01e6ee4f2c27f3ce0a Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 7 Jan 2025 10:41:50 -0800 Subject: [PATCH 6/6] Add comments to CSFLE consumer examples (#1893) --- examples/avro_consumer_encryption.py | 7 ++++++- examples/json_consumer_encryption.py | 7 ++++++- examples/protobuf_consumer_encryption.py | 7 ++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/examples/avro_consumer_encryption.py b/examples/avro_consumer_encryption.py index f5a931339..b49d7bdad 100644 --- a/examples/avro_consumer_encryption.py +++ b/examples/avro_consumer_encryption.py @@ -98,10 +98,15 @@ def main(args): sr_conf = {'url': args.schema_registry} schema_registry_client = SchemaRegistryClient(sr_conf) + rule_conf = None + # KMS credentials can be passed as follows + # rule_conf = {'secret.access.key': 'xxx', 'access.key.id': 'yyy'} + # Alternatively, the KMS credentials can be set via environment variables avro_deserializer = AvroDeserializer(schema_registry_client, schema_str, - dict_to_user) + dict_to_user, + rule_conf=rule_conf) consumer_conf = {'bootstrap.servers': args.bootstrap_servers, 'group.id': args.group, diff --git a/examples/json_consumer_encryption.py b/examples/json_consumer_encryption.py index 5b604b0dc..660fcc94e 100644 --- a/examples/json_consumer_encryption.py +++ b/examples/json_consumer_encryption.py @@ -95,10 +95,15 @@ def main(args): sr_conf = {'url': args.schema_registry} schema_registry_client = SchemaRegistryClient(sr_conf) + rule_conf = None + # KMS credentials can be passed as follows + # rule_conf = {'secret.access.key': 'xxx', 'access.key.id': 'yyy'} + # Alternatively, the KMS credentials can be set via environment variables json_deserializer = JSONDeserializer(schema_str, dict_to_user, - schema_registry_client) + schema_registry_client, + rule_conf=rule_conf) consumer_conf = {'bootstrap.servers': args.bootstrap_servers, 'group.id': args.group, diff --git a/examples/protobuf_consumer_encryption.py b/examples/protobuf_consumer_encryption.py index 94fc565a2..281868a13 100644 --- a/examples/protobuf_consumer_encryption.py +++ b/examples/protobuf_consumer_encryption.py @@ -69,10 +69,15 @@ def main(args): sr_conf = {'url': args.schema_registry} schema_registry_client = SchemaRegistryClient(sr_conf) + rule_conf = None + # KMS credentials can be passed as follows + # rule_conf = {'secret.access.key': 'xxx', 'access.key.id': 'yyy'} + # Alternatively, the KMS credentials can be set via environment variables protobuf_deserializer = ProtobufDeserializer(user_pb2.User, {'use.deprecated.format': False}, - schema_registry_client) + schema_registry_client, + rule_conf=rule_conf) consumer_conf = {'bootstrap.servers': args.bootstrap_servers, 'group.id': args.group,