Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

use single json schema for key and value #158

Merged
merged 6 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ resolver = "2"
members = [
"tansu-kafka-model",
"tansu-kafka-sans-io",
"tansu-proxy", "tansu-schema-registry",
"tansu-proxy",
"tansu-schema-registry",
"tansu-server",
"tansu-storage",
]
Expand Down Expand Up @@ -64,6 +65,8 @@ opentelemetry_sdk = "0.21.2"
pretty_assertions = "1"
prettyplease = "0.2.25"
proc-macro2 = "1.0.92"
protobuf-json-mapping = "3.7.1"
protobuf-parse = "3.7.1"
protobuf = { version = "3.7.1", features = ["with-bytes"] }
quote = "1.0"
rand = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ADD / /usr/src/
RUN cargo build --package ${PACKAGE} --release

RUN <<EOF
mkdir /image /image/schema
mkdir /image /image/schema /image/tmp

# copy any dynamically linked libaries used
for lib in $(ldd target/release/* 2>/dev/null|grep "=>"|awk '{print $3}'|sort|uniq); do
Expand All @@ -48,4 +48,5 @@ EOF

FROM scratch
COPY --from=builder /image /
ENV TMP=/tmp
ENTRYPOINT [ "/tansu-server" ]
98 changes: 85 additions & 13 deletions docs/schema-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,92 @@ When a message is produced to a topic with an associated schema,
the message is validated against the schema. If a message does not conform to its
schema it is rejected with an `INVALID_RECORD` error.

Within the schema registry the each topic has a directory containing `key.json`
(the schema for the message key) and/or `value.json` (the schema for the message value).
## JSON schema

An example JSON schema for the "person" topic:

```json
{
"title": "Person",
"type": "object",
"properties": {

"key": {
"type": "string",
"pattern": "^[A-Z]{3}-\\d{3}$"
},

"value": {
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}
}
}
```

As an example, for a topic "person" that has schemas for both the key and value,
the schema registry would be laid out as:
The schema must be an object, with properties for the message "key" and/or "value".

A schema that covers the message key, but allows any message value could look like:

```json
{
"title": "Person",
"type": "object",
"properties": {
"key": {
"type": "string",
"pattern": "^[A-Z]{3}-\\d{3}$"
},
}
}
```

```shell
person/key.json
person/value.json
A schema that allows any message key, but restricts the message value could look like:

```json
{
"title": "Person",
"type": "object",
"properties": {
"value": {
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}
}
}
```

The person schemas can be found in the `etc/schema` directory of the Tansu GitHub
repository. This directory is also used when starting Tansu using
the `just tansu-server` recipe or the Docker compose.
the `just tansu-server` recipe or Docker compose.

Starting Tansu with schema validation enabled:

Expand Down Expand Up @@ -49,14 +121,14 @@ Produce a message that is valid for the person schema:

```shell
❯ just person-topic-produce-valid
echo 'h1:pqr,h2:jkl,h3:uio {"code": "ABC-123"} {"firstName": "John", "lastName": "Doe", "age": 21}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
echo 'h1:pqr,h2:jkl,h3:uio "ABC-123" {"firstName": "John", "lastName": "Doe", "age": 21}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
```

Produce a message that is invalid for the person schema (the `age` must be greater to equal to 0):

```shell
❯ just person-topic-produce-invalid
echo 'h1:pqr,h2:jkl,h3:uio {"code": "ABC-123"} {"firstName": "John", "lastName": "Doe", "age": -1}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
echo 'h1:pqr,h2:jkl,h3:uio "ABC-123" {"firstName": "John", "lastName": "Doe", "age": -1}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
[2024-12-19 11:51:28,412] ERROR Error when sending message to topic person with key: 19 bytes, value: 51 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.
```
Expand All @@ -68,9 +140,9 @@ The server log contains the reason for the message being rejected:
2024-12-19T11:51:28.407524Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 56: r=()
2024-12-19T11:51:28.407546Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 40: validator=Some(Validator { root: SchemaNode { validators: Keyword, location: Location(""), absolute_path: Some(Uri { scheme: "https", authority: Some(Authority { userinfo: None, host: "example.com", host_parsed: RegName("example.com"), port: None }), path: "/person.schema.json", query: None, fragment: None }) }, config: CompilationConfig { draft: None, content_media_type: [], content_encoding: [] } }) encoded=Some(b"{\"firstName\": \"John\", \"lastName\": \"Doe\", \"age\": -1}")
2024-12-19T11:51:28.407589Z DEBUG peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 47: instance=Object {"age": Number(-1), "firstName": String("John"), "lastName": String("Doe")}
2024-12-19T11:51:28.407626Z ERROR peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 51: err=ValidationError { instance: Number(-1), kind: Minimum { limit: Number(0) }, instance_path: Location("/age"), schema_path: Location("/properties/age/minimum") }
2024-12-19T11:51:28.407652Z ERROR peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 57: err=Api(InvalidRecord)
2024-12-19T11:51:28.407724Z ERROR peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_server::broker::produce: 75: err=Storage(Api(InvalidRecord))
2024-12-19T11:51:28.407626Z WARN peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 51: err=ValidationError { instance: Number(-1), kind: Minimum { limit: Number(0) }, instance_path: Location("/age"), schema_path: Location("/properties/age/minimum") }
2024-12-19T11:51:28.407652Z WARN peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_schema_registry::json: 57: err=Api(InvalidRecord)
2024-12-19T11:51:28.407724Z WARN peer{addr=127.0.0.1:60095}:produce{api_key=0 api_version=11 correlation_id=5}: tansu_server::broker::produce: 75: err=Storage(Api(InvalidRecord))
```

[json-schema-org]: https://json-schema.org/
28 changes: 28 additions & 0 deletions etc/schema/person.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"title": "Person",
"type": "object",
"properties": {
"key": {
"type": "string",
"pattern": "^[A-Z]{3}-\\d{3}$"
},
"value": {
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}
}
}
12 changes: 0 additions & 12 deletions etc/schema/person/key.json

This file was deleted.

21 changes: 0 additions & 21 deletions etc/schema/person/value.json

This file was deleted.

18 changes: 15 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,23 @@ person-topic-create:
kafka-topics --bootstrap-server localhost:9092 --partitions=3 --replication-factor=1 --create --topic person

person-topic-produce-valid:
echo 'h1:pqr,h2:jkl,h3:uio {"code": "ABC-123"} {"firstName": "John", "lastName": "Doe", "age": 21}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true
echo 'h1:pqr,h2:jkl,h3:uio "ABC-123" {"firstName": "John", "lastName": "Doe", "age": 21}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true

person-topic-produce-invalid:
echo 'h1:pqr,h2:jkl,h3:uio {"code": "ABC-123"} {"firstName": "John", "lastName": "Doe", "age": -1}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true

echo 'h1:pqr,h2:jkl,h3:uio "ABC-123" {"firstName": "John", "lastName": "Doe", "age": -1}' | kafka-console-producer --bootstrap-server localhost:9092 --topic person --property parse.headers=true --property parse.key=true

person-topic-consume:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--consumer-property fetch.max.wait.ms=15000 \
--group person-consumer-group --topic person \
--from-beginning \
--property print.timestamp=true \
--property print.key=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.value=true

tansu-server:
./target/debug/tansu-server \
Expand Down
5 changes: 5 additions & 0 deletions tansu-schema-registry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@ version.workspace = true
license.workspace = true

[dependencies]
anyhow.workspace = true
bytes.workspace = true
jsonschema.workspace = true
object_store.workspace = true
protobuf-parse.workspace = true
protobuf.workspace = true
serde.workspace = true
serde_json.workspace = true
tansu-kafka-sans-io = { path = "../tansu-kafka-sans-io" }
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true

[dev-dependencies]
pretty_assertions.workspace = true
protobuf-json-mapping.workspace = true
tracing-subscriber.workspace = true


Expand Down
Loading
Loading