From 569c4cca3ad5e588d56e5b7c8b29eacab1cab7a1 Mon Sep 17 00:00:00 2001 From: Naxin Date: Thu, 4 Dec 2025 10:18:28 -0500 Subject: [PATCH 1/4] update --- .../_async/schema_registry_client.py | 2 ++ .../_sync/schema_registry_client.py | 28 ++++++++++++++----- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 00367dee9..d89932e13 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -31,6 +31,7 @@ from cachetools import Cache, LRUCache, TTLCache from httpx import Response +from confluent_kafka import version from confluent_kafka.schema_registry.common.schema_registry_client import ( RegisteredSchema, Schema, @@ -479,6 +480,7 @@ async def send_request( 'Content-Length': str(len(body_str)), 'Content-Type': "application/vnd.schemaregistry.v1+json", 'Confluent-Accept-Unknown-Properties': "true", + 'Confluent-Client-Version': f"python/{version()}" } if self.bearer_auth_credentials_source: diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 5c357060b..9e36d8508 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -31,14 +31,15 @@ from cachetools import Cache, LRUCache, TTLCache from httpx import Response +from confluent_kafka import version from confluent_kafka.schema_registry.common.schema_registry_client import ( RegisteredSchema, Schema, SchemaVersion, ServerConfig, _BearerFieldProvider, - _SchemaCache, _StaticFieldProvider, + _SchemaCache, full_jitter, is_retriable, is_success, @@ -441,7 +442,9 @@ def delete(self, url: str) -> Any: def put(self, url: str, body: Optional[dict] = None) -> Any: return self.send_request(url, method='PUT', body=body) - def send_request(self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None) -> Any: + def send_request( + self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None + ) -> Any: """ Sends HTTP request to the SchemaRegistry, trying each base URL in turn. @@ -477,6 +480,7 @@ def send_request(self, url: str, method: str, body: Optional[dict] = None, query 'Content-Length': str(len(body_str)), 'Content-Type': "application/vnd.schemaregistry.v1+json", 'Confluent-Accept-Unknown-Properties': "true", + 'Confluent-Client-Version': f"python/{version()}" } if self.bearer_auth_credentials_source: @@ -942,7 +946,9 @@ def lookup_schema( query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) - response = self._rest_client.post('subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request) + response = self._rest_client.post( + 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request + ) result = RegisteredSchema.from_dict(response) @@ -1043,7 +1049,9 @@ def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'R return registered_schema query = {'format': fmt} if fmt is not None else None - response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query) + response = self._rest_client.get( + 'subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query + ) registered_schema = RegisteredSchema.from_dict(response) @@ -1123,7 +1131,9 @@ def get_version( return registered_schema query: dict[str, Any] = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} - response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query) + response = self._rest_client.get( + 'subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query + ) registered_schema = RegisteredSchema.from_dict(response) @@ -1210,7 +1220,9 @@ def delete_version(self, subject_name: str, version: int, permanent: bool = Fals 'subjects/{}/versions/{}?permanent=true'.format(_urlencode(subject_name), version) ) else: - response = self._rest_client.delete('subjects/{}/versions/{}'.format(_urlencode(subject_name), version)) + response = self._rest_client.delete( + 'subjects/{}/versions/{}'.format(_urlencode(subject_name), version) + ) # Clear cache for both soft and hard deletes to maintain consistency self._cache.remove_by_subject_version(subject_name, version) @@ -1338,7 +1350,9 @@ def test_compatibility_all_versions( ) return response['is_compatible'] - def set_config(self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None) -> 'ServerConfig': + def set_config( + self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None + ) -> 'ServerConfig': """ Update global or subject config. From 7b5b1d2350467d14344e89ed02c0323189ad9d69 Mon Sep 17 00:00:00 2001 From: Naxin Date: Thu, 4 Dec 2025 10:30:03 -0500 Subject: [PATCH 2/4] revert some format changes --- .../_sync/schema_registry_client.py | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 9e36d8508..2ca5aaf00 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -442,9 +442,7 @@ def delete(self, url: str) -> Any: def put(self, url: str, body: Optional[dict] = None) -> Any: return self.send_request(url, method='PUT', body=body) - def send_request( - self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None - ) -> Any: + def send_request(self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None) -> Any: """ Sends HTTP request to the SchemaRegistry, trying each base URL in turn. @@ -946,9 +944,7 @@ def lookup_schema( query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) - response = self._rest_client.post( - 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request - ) + response = self._rest_client.post('subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request) result = RegisteredSchema.from_dict(response) @@ -1049,9 +1045,7 @@ def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'R return registered_schema query = {'format': fmt} if fmt is not None else None - response = self._rest_client.get( - 'subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query - ) + response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query) registered_schema = RegisteredSchema.from_dict(response) @@ -1131,9 +1125,7 @@ def get_version( return registered_schema query: dict[str, Any] = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} - response = self._rest_client.get( - 'subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query - ) + response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query) registered_schema = RegisteredSchema.from_dict(response) @@ -1220,9 +1212,7 @@ def delete_version(self, subject_name: str, version: int, permanent: bool = Fals 'subjects/{}/versions/{}?permanent=true'.format(_urlencode(subject_name), version) ) else: - response = self._rest_client.delete( - 'subjects/{}/versions/{}'.format(_urlencode(subject_name), version) - ) + response = self._rest_client.delete('subjects/{}/versions/{}'.format(_urlencode(subject_name), version)) # Clear cache for both soft and hard deletes to maintain consistency self._cache.remove_by_subject_version(subject_name, version) @@ -1350,9 +1340,7 @@ def test_compatibility_all_versions( ) return response['is_compatible'] - def set_config( - self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None - ) -> 'ServerConfig': + def set_config(self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None) -> 'ServerConfig': """ Update global or subject config. From bb4b06e139a1ad26b8283ca41611bebeeb6482ef Mon Sep 17 00:00:00 2001 From: Naxin Date: Thu, 4 Dec 2025 10:31:17 -0500 Subject: [PATCH 3/4] updaste --- .../schema_registry/_sync/schema_registry_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 2ca5aaf00..f55787a5e 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -38,8 +38,8 @@ SchemaVersion, ServerConfig, _BearerFieldProvider, - _StaticFieldProvider, _SchemaCache, + _StaticFieldProvider, full_jitter, is_retriable, is_success, From 17bf7da588237457225fd2eba3b601c6ec629568 Mon Sep 17 00:00:00 2001 From: Naxin Date: Thu, 4 Dec 2025 11:24:51 -0500 Subject: [PATCH 4/4] formatting --- .../schema_registry/_sync/schema_registry_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index f55787a5e..05d001eb9 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -478,7 +478,7 @@ def send_request(self, url: str, method: str, body: Optional[dict] = None, query 'Content-Length': str(len(body_str)), 'Content-Type': "application/vnd.schemaregistry.v1+json", 'Confluent-Accept-Unknown-Properties': "true", - 'Confluent-Client-Version': f"python/{version()}" + 'Confluent-Client-Version': f"python/{version()}", } if self.bearer_auth_credentials_source: