Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

json schema support #139

Merged
merged 7 commits into from
Dec 22, 2024
Merged

json schema support #139

merged 7 commits into from
Dec 22, 2024

Conversation

shortishly
Copy link
Contributor

@shortishly shortishly commented Dec 19, 2024

initial schema registry crate with json schema support using file:// or s3:// based registries

tansu-server/tests/person.rs has a valid and invalid message checking that invalid record is thrown

message schema used is at:

etc/schemas/person/key.json
etc/schemas/person/value.json

@shortishly shortishly linked an issue Dec 19, 2024 that may be closed by this pull request
@shortishly shortishly self-assigned this Dec 19, 2024
@shortishly shortishly changed the title using the "person" schema to validate a produced message json schema support Dec 19, 2024
@shortishly
Copy link
Contributor Author

startup with schema validation enabled:

❯ just tansu-server
./target/debug/tansu-server --kafka-cluster-id ${CLUSTER_ID} --kafka-advertised-listener-url tcp://${ADVERTISED_LISTENER} --schema file://./etc/schemas --storage-engine ${STORAGE_ENGINE} 2>&1 | tee tansu.log

create "person" topic:

❯ just person-topic-create         
kafka-topics --bootstrap-server localhost:9092 --partitions=3 --replication-factor=1 --create --topic person
Created topic person.

produce message that is valid for the person schema:

❯ just person-topic-produce-valid                   
echo 'h1:pqr,h2:jkl,h3:uio	{"code": "ABC-123"}	{"firstName": "John", "lastName": "Doe", "age": 21}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true

produce message that is invalid for the person schema:

❯ just person-topic-produce-invalid
echo 'h1:pqr,h2:jkl,h3:uio	{"code": "ABC-123"}	{"firstName": "John", "lastName": "Doe", "age": -1}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
[2024-12-19 11:51:28,412] ERROR Error when sending message to topic person with key: 19 bytes, value: 51 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.

From the server logs:

2024-12-19T11:51:28.407467Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 47: instance=Object {"code": String("ABC-123")}
2024-12-19T11:51:28.407524Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 56: r=()
2024-12-19T11:51:28.407546Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 40: validator=Some(Validator { root: SchemaNode { validators: Keyword, location: Location(""), absolute_path: Some(Uri { scheme: "https", authority: Some(Authority { userinfo: None, host: "example.com", host_parsed: RegName("example.com"), port: None }), path: "/person.schema.json", query: None, fragment: None }) }, config: CompilationConfig { draft: None, content_media_type: [], content_encoding: [] } }) encoded=Some(b"{\"firstName\": \"John\", \"lastName\": \"Doe\", \"age\": -1}")
2024-12-19T11:51:28.407589Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 47: instance=Object {"age": Number(-1), "firstName": String("John"), "lastName": String("Doe")}
2024-12-19T11:51:28.407626Z ERROR peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 51: err=ValidationError { instance: Number(-1), kind: Minimum { limit: Number(0) }, instance_path: Location("/age"), schema_path: Location("/properties/age/minimum") }
2024-12-19T11:51:28.407652Z ERROR peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 57: err=Api(InvalidRecord)
2024-12-19T11:51:28.407724Z ERROR peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_server::broker::produce: 75: err=Storage(Api(InvalidRecord))

The key validated, but the value was invalid.

@shortishly shortishly marked this pull request as ready for review December 22, 2024 11:20
@shortishly shortishly merged commit 09d8070 into main Dec 22, 2024
3 checks passed
@shortishly shortishly deleted the 138-schema-support branch December 22, 2024 16:41
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

schema support
1 participant