Skip to content

Sample to show how process different types of avro subjects in a single topic #98

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

buddhike
Copy link

@buddhike buddhike commented Apr 3, 2025

No description provided.

@buddhike buddhike requested a review from nicusX April 3, 2025 05:37
Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for the contribution.
I would suggest some changes to make it simpler to understand, and also not to hint any not-so-good practice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.gitignore in the subfolder is not required.
There is a gitignore at top level. If there is anything missing there please update that one

* Flink API: DataStream API
* Language: Java (11)

This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain this is specific to Confluent Schema Registry


* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually add to the list the connectors used in the example. In this case, it's also important to add that the example uses AVRO Confluent Schema Registry


This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)).

A `KafkaSource` produces a stream of Avro data objects (`SpecificRecord`), fetching the writer's schema from AWS Glue Schema Registry. The Avro Kafka message value must have been serialized using AWS Glue Schema Registry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean Confluent. Schema Registry?

env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.enableCheckpointing(60000);
}
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really required

env.execute("avro-one-topic-many-subjects");
}

private static void setupAirQualityGenerator(String bootstrapServers, String sourceTopic, String schemaRegistryUrl, Map<String, Object> schemaRegistryConfig, StreamExecutionEnvironment env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though having the data generator within the same Flink app works, we are deliberately avoiding doing it in any of the examples. The reason is that building jobs with multiple dataflows is strongly discouraged.
We are avoiding using any bad practice in examples, not to suggest it may be a good idea doing it.

I reckon it's more complicated, but you can add a separate module with a standalone Java application which generates data. Something similar to what we do in this example, even though in that case it's Kinesis

* strategies and event time extraction. However, for those scenarios to work
* all subjects should have a standard set of fields.
*/
class Option {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can use org.apache.flink.types.SerializableOptional<T> that comes with Flink

}

// Custom deserialization schema for handling multiple generic Avro record types
class OptionDeserializationSchema implements KafkaRecordDeserializationSchema<Option> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, move to a top level class for readability

}
}

class RecordNameSerializer<T> implements KafkaRecordSerializationSchema<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to top level class

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP);
String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code building the dataflow is a bit hard to follow.
I would suggest to do what we tend to do in other examples

  • In runtime configuration, use a PropertyGroup for each source and sink, even if some configurations are repeated
  • Instantiate Source and Sink in a local method, the Properties which contains all configuration for that specific component. Extract specific properties, like topic name, within the method rather than in the main() directly
  • Build the dataflow just attaching the operators one after the others, using intermediate streams variables only when it helps readability
  • Avoid having methods that attach operators to the dataflow. Practically, any method which expects a DataStream or StreamingExecutionEnvironment as a parameter should be avoided.
  • If an operator implementation like a map for a filter is simple, try using a lambda and inlining it. If the operator implementation is complex externalize the implementation to a separate class

See examples here

We are not following these patterns in all examples, but we are trying to converge as possible

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants