Description
Description :
There seems to be a bug in how the SchemaRegistryClient caches the subject entries ( with confluent_kafka v2.8.0 ) in the SchemaRegistryClient.get_schema()
method. It can add a subject_name=None
entry in the cache when only a schema_id is passed, and this does not get invalidated when the subject is permanently deleted off.
Adding details on how to reproduce this issue :
Steps to reproduce :
- Created a SchemaRegistryClient instance and inspected the empty cache
In [1]: print(client._cache.schema_id_index); print(client._cache.schema_index)
defaultdict(<class 'dict'>, {})
defaultdict(<class 'dict'>, {})
- Registered a schema and the cache correctly reflects the subject-name entry mappings
In [3]: client.register_schema(subject_name, Schema(schema_str, 'AVRO'))
Out[3]: 8636
In [4]: print(client._cache.schema_id_index); print(client._cache.schema_index)
defaultdict(<class 'dict'>, {'testkafka.test_librdkafka_v2.8.0.schema_registry_issue_tests.on_version_2.8.0.release_notes_specific.test_schema_deletion_apis': {8636: Schema(schema_str=None, schema_type='AVRO', references=[], metadata=None, rule_set=None)}})
defaultdict(<class 'dict'>, {'testkafka.test_librdkafka_v2.8.0.schema_registry_issue_tests.on_version_2.8.0.release_notes_specific.test_schema_deletion_apis': {Schema(schema_str=None, schema_type='AVRO', references=[], metadata=None, rule_set=None): 8636}})
- Queried the schema with schema-id. And, now the updated cache seems to have a None entry ( in place of the subject-name ) that was added by
get_schema()
.
In [5]: client.get_schema(8636)
Out[5]: Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None)
In [6]: print(client._cache.schema_id_index); print(client._cache.schema_index)
defaultdict(<class 'dict'>, {'testkafka.test_librdkafka_v2.8.0.schema_registry_issue_tests.on_version_2.8.0.release_notes_specific.test_schema_deletion_apis': {8636: Schema(schema_str=None, schema_type='AVRO', references=[], metadata=None, rule_set=None)}, None: {8636: Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None)}})
defaultdict(<class 'dict'>, {'testkafka.test_librdkafka_v2.8.0.schema_registry_issue_tests.on_version_2.8.0.release_notes_specific.test_schema_deletion_apis': {Schema(schema_str=None, schema_type='AVRO', references=[], metadata=None, rule_set=None): 8636}, None: {Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None): 8636}})
- On subject permanent deletion, now the client correctly deletes off the entry for the subject-name but not for the None entries.
In [7]: client.delete_subject(subject_name, permanent=False)
Out[7]: [1]
In [8]: client.delete_subject(subject_name, permanent=True)
Out[8]: [1]
In [9]: print(client._cache.schema_id_index); print(client._cache.schema_index)
defaultdict(<class 'dict'>, {None: {8636: Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None)}})
defaultdict(<class 'dict'>, {None: {Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None): 8636}})
- On further lookups ( with
get_schema()
), we would now be reading from the cached None entry ( when a subject_name is not passed ). Note that this was the only subject for this schema-id, and on subject deletion the schema should also get deleted now.
In [10]: client.get_schema(8636)
Out[10]: Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None)
However the schema and schema-id were actually deleted off ( verified with making a normal rest call ) -
{'error_code': 40403,
'message': 'Schema 8636 not found io.confluent.rest.exceptions.RestNotFoundException: Schema 8636 not found\nio.confluent.rest.exceptions.RestNotFoundException: Schema 8636 not found\n\tat io.confluent.kafka.schemaregistry.rest.exceptions.Errors.schemaNotFoundException(Errors.java:129)\n\tat io.confluent.kafka.schemaregistry.rest.resources.SchemasResource.getSchema(SchemasResource.java:168)\n\tat jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)\n
...
Impact :
As the cache None entry can never get invalidated / deleted off, then this means that there will always be a dangling reference to an update/deleted schema ( with an active SchemaRegistryClient instance ). And we can fetch this incorrect cache reference when using get_schema(schema_id)
( without the subject_name ).
Caching issue with get_latest_version()
Also noting a behavior with the get_latest_version()
method when using the LRUCache for the _latest_version_cache
:
We are storing the subject-name as the cache-key when get_latest_version()
is called. And this means that on a new schema version register, we never invalidate or update this old reference ( to version 1 ) and get_latest_version()
would return this old value from cache.
In [2]: schema_str
Out[2]: '{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}'
In [3]: client.register_schema(subject_name, Schema(schema_str, 'AVRO'))
Out[3]: 8632
In [4]: client.get_latest_version(subject_name)
Out[4]: RegisteredSchema(schema_id=8632, schema=Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None), subject='testkafka.test_librdkafka_v2.8.0.schema_registry_issue_tests.on_version_2.8.0.release_notes_specific.test_schema_deletion_apis', version=1)
In [6]: schema_str
Out[6]: '{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"},{"name":"hobby","type":["null","string"],"default":"null"}]}'
In [7]: client.register_schema(subject_name, Schema(schema_str, 'AVRO'))
Out[7]: 8633
# This give incorrect output
In [8]: client.get_latest_version(subject_name)
Out[8]: RegisteredSchema(schema_id=8632, schema=Schema(schema_str='{"type":"record","name":"value_record","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"money","type":"float"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None), subject='testkafka.test_librdkafka_v2.8.0.schema_registry_issue_tests.on_version_2.8.0.release_notes_specific.test_schema_deletion_apis', version=1)
Could we have a look at the above 2 bugs in caching behavior, and if we can make the SchemaRegistryClient cache more robust ?