-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Refactor Client-Example ConsumerConfigs #91
Refactor Client-Example ConsumerConfigs #91
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is just a draft, so not sure what else do you plan. But I think you should go deeper ... I do not think there is any need to have some special treatment for the TLS certificates or OAuth authentication. I think they can be configured from the KAFKA_...
env vars as well.
", oauthTokenEndpointUri='" + oauthTokenEndpointUri + '\'' + | ||
", additionalConfig='" + additionalConfig + '\'' + | ||
", tracingSystem='" + tracingSystem + '\'' + | ||
'}'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undo changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -46,8 +46,7 @@ public static void main(String[] args) { | |||
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(); | |||
streams = new KafkaStreams(builder.build(), props, supplier); | |||
} else { | |||
log.error("Error: TRACING_SYSTEM {} is not recognized or supported!", config.getTracingSystem()); | |||
streams = new KafkaStreams(builder.build(), props); | |||
throw new RuntimeException("Error: TRACING_SYSTEM " + tracingSystem + " is not recognized or supported!"); | |||
} | |||
} else { | |||
streams = new KafkaStreams(builder.build(), props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undo changes
value: jaeger | ||
--- | ||
apiVersion: apps/v1 | ||
kind: Deployment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undo changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undone (should be correct now)
} | ||
|
||
if ((config.getOauthAccessToken() != null) | ||
|| (config.getOauthTokenEndpointUri() != null && config.getOauthClientId() != null && config.getOauthRefreshToken() != null) | ||
|| (config.getOauthTokenEndpointUri() != null && config.getOauthClientId() != null && config.getOauthClientSecret() != null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undo indentation change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Full block of code removed. All security is being handled by the user via the envVars.
README.md
Outdated
* `STRIMZI_TOPIC` - name of topic which consumer subscribes | ||
* `KAFKA_GROUP_ID` - specifies the consumer group id for the consumer | ||
* `STRIMZI_MESSAGE_COUNT` - the number of messages the consumer should receive | ||
* `STRIMZI_LOG_LEVEL` - logging level | ||
* `ADDITIONAL_CONFIG` - additional configuration for a consumer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Including the STRIMZI_ prefixed ones?
README.md
Outdated
* `LOG_LEVEL` - logging level | ||
* `KAFKA_BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092` | ||
* `STRIMZI_TOPIC` - name of topic which consumer subscribes | ||
* `KAFKA_GROUP_ID` - specifies the consumer group id for the consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Env vars used for Kafka client configuration can be removed.
I would also add a note to the README.md explaining that all Kafka client properties can be configured by adding an env var with the Kafka property name in a capital letters, delimited by "_", and prefixed with "KAFKA"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@scholzj When you say "special treatment", do you mean these lines [1] [2]? I kept these in to give the same behaviour as the code before it. Are you saying we can strip this code out completely? If so, are we removing some of the assistance we're offering to the user? So, the user will have to make sure Ssl. Oauth etc. are configured properly from, their end? Thanks [1] client-examples/java/kafka/consumer/src/main/java/KafkaConsumerConfig.java Lines 90 to 99 in e40023a
[2] client-examples/java/kafka/consumer/src/main/java/KafkaConsumerConfig.java Lines 101 to 111 in e40023a
|
Yes, that is the code I mean. Also the (de)serializers for example. You can call it an assistance. But it also makes it unclear how exactly it is configured. Because one day when they need to write their own app, they will need to dig into the code to understand how to actually do it. I think anything what ends up in the consumer properties file should go directly through the |
@scholzj @kyguy I've removed the Ssl and oAuth configs and only left the USER_CONFIGS in. These are the ones that will have a STRIMZI_ prefix. If you guys give me the go ahead that this is the way to go, I'll address the other comments @kyguy left and then move on to the Producer and Streams. Thanks guys. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments, but it starts looking better. You should also update all the YAML example files to make it work with the new code.
README.md
Outdated
* `STRIMZI_TOPIC` - name of topic which consumer subscribes | ||
* `KAFKA_GROUP_ID` - specifies the consumer group id for the consumer | ||
* `STRIMZI_MESSAGE_COUNT` - the number of messages the consumer should receive | ||
* `STRIMZI_LOG_LEVEL` - logging level | ||
* `ADDITIONAL_CONFIG` - additional configuration for a consumer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this as well I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scholzj I've removed ADDITIONAL_CONFIG as it's no longer part of ConsumerConfig. Do you suggest removing the STRIMZI_ prefixed envVar examples too and just leave in the explanation at the beginning of the ##Configuration paragraph?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.yaml files updated with correct prefixes.
", enableAutoCommit='" + enableAutoCommit + '\'' + | ||
", messageCount=" + messageCount + | ||
", tracingSystem='" + tracingSystem + '\'' + | ||
kafkaConfigOptionsToString() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? The Kafka client prints its config at startup. If you want to keep this, you should check the formatting to fit into the other fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed now as we have kafkaConfigOptionsToString()
. Removed
private static final Map<String, String> USER_CONFIGS = System.getenv() | ||
.entrySet() | ||
.stream() | ||
.filter(map -> map.getKey().startsWith(KAFKA_PREFIX)) | ||
.collect(Collectors.toMap(map -> convertEnvVarToPropertyKey(map.getKey()), map -> map.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should you use Properties directly here?
- I also wonder if this should be static. In general, for things such as testing etc., it would be better to handle this in the constructor or in the
fromEnv
method. You might nto test it today, but maybe you can add some tests for this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discuss today with @kyguy
"bootstrapServers='" + bootstrapServers + '\'' + | ||
", topic='" + topic + '\'' + | ||
", delay=" + delay + | ||
", messageCount=" + messageCount + | ||
", message='" + message + '\'' + | ||
", acks='" + acks + '\'' + | ||
", headers='" + headers + '\'' + | ||
", sslTruststoreCertificates='" + sslTruststoreCertificates + '\'' + | ||
", sslKeystoreKey='" + sslKeystoreKey + '\'' + | ||
", sslKeystoreCertificateChain='" + sslKeystoreCertificateChain + '\'' + | ||
", oauthClientId='" + oauthClientId + '\'' + | ||
", oauthClientSecret='" + oauthClientSecret + '\'' + | ||
", oauthAccessToken='" + oauthAccessToken + '\'' + | ||
", oauthRefreshToken='" + oauthRefreshToken + '\'' + | ||
", oauthTokenEndpointUri='" + oauthTokenEndpointUri + '\'' + | ||
", additionalConfig='" + additionalConfig + '\'' + | ||
", tracingSystem='" + tracingSystem + '\'' + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should not be part of this PR? Ideally, you should try to keep the PRs focused on a single thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove
@@ -46,8 +46,7 @@ public static void main(String[] args) { | |||
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(); | |||
streams = new KafkaStreams(builder.build(), props, supplier); | |||
} else { | |||
log.error("Error: TRACING_SYSTEM {} is not recognized or supported!", config.getTracingSystem()); | |||
streams = new KafkaStreams(builder.build(), props); | |||
throw new RuntimeException("Error: TRACING_SYSTEM " + tracingSystem + " is not recognized or supported!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above ... Maybe this should not be part of this PR? Ideally, you should try to keep the PRs focused on a single thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert to original
private final String topic; | ||
private final String groupId; | ||
private final String autoOffsetReset = "earliest"; | ||
private final String enableAutoCommit = "false"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a weird left-over. You do not set it, but you keep it in the code. If it is needed, maybe you should check if it is enabled or not through the Kafka properties and set it from there? (https://kafka.apache.org/documentation/#consumerconfigs_enable.auto.commit)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
autoOffsetReset removed. Discuss enableAutoCommit with @kyguy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bit more complicated then just removing it. The auto-commit is part of Kafka. But what behaviour do you expect when it is disabled? How does the client do the commits etc.? That is something you need to consider.
README.md
Outdated
* `KAFKA_BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092` | ||
* `STRIMZI_TOPIC` - name of topic which consumer subscribes | ||
* `KAFKA_GROUP_ID` - specifies the consumer group id for the consumer | ||
* `STRIMZI_MESSAGE_COUNT` - the number of messages the consumer should receive | ||
* `STRIMZI_LOG_LEVEL` - logging level |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should expans this a bit. List the Strimzi specific options and elaborate that any Kafka otpions can be used as Env var by:
- Preficing it with
KAFKA_
and making it uppercase and using_
instead of.
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added:
Below are listed and described environmental variables that are NOT Kafka-specific but must be used. These environmental variables should be prefixed
with uppercase STRIMZI_ using _ instead of . (eg. STRIMZI_TOPIC). Any Kafka configuration option can be used as an environmental variable and should be prefixed with uppercase KAFKA_ using _ instead of . (eg. KAFKA_BOOTSTRAP_SERVERS).
@@ -41,7 +43,7 @@ public static void main(String[] args) { | |||
} | |||
} | |||
|
|||
boolean commit = !Boolean.parseBoolean(config.getEnableAutoCommit()); | |||
boolean commit = !Boolean.parseBoolean (props.getProperty(ENABLE_AUTO_COMMIT_CONFIG)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean commit = !Boolean.parseBoolean (props.getProperty(ENABLE_AUTO_COMMIT_CONFIG)); | |
boolean commit = !Boolean.parseBoolean(props.getProperty(ENABLE_AUTO_COMMIT_CONFIG)); |
java/kafka/deployment-ssl.yaml
Outdated
value: java-kafka-consumer | ||
- name: LOG_LEVEL | ||
- name: STRIMZI_ LOG_LEVEL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- name: STRIMZI_ LOG_LEVEL | |
- name: STRIMZI_LOG_LEVEL |
@@ -3,7 +3,7 @@ kind: KafkaTopic | |||
metadata: | |||
name: my-topic | |||
labels: | |||
strimzi.io/cluster: my-cluster | |||
strimzi.io/cluster: my-cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra two spaces removed
- name: JAEGER_SAMPLER_PARAM | ||
value: "1" | ||
- name: TRACING_SYSTEM | ||
value: jaeger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add two space indentation back
@@ -47,8 +47,7 @@ public static void main(String[] args) { | |||
streams = new KafkaStreams(builder.build(), props, supplier); | |||
} else { | |||
log.error("Error: TRACING_SYSTEM {} is not recognized or supported!", config.getTracingSystem()); | |||
streams = new KafkaStreams(builder.build(), props); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rogue formatting change, can be undone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some formatting nits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left few nits mostl around formatting etc. But looks good overall.
README.md
Outdated
@@ -57,7 +57,9 @@ To run the OAuth example, you will need to have your Kafka cluster configured wi | |||
## Configuration | |||
|
|||
Although this Hello World is simple example it is fully configurable. | |||
Below are listed and described environmental variables. | |||
Below are listed and described environmental variables that are NOT Kafka-specific but must be used. | |||
These environmental variables should be prefixed with `STRIMZI_` using `_` instead of `.` (eg. STRIMZI_TOPIC). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just random environment variables. The _
versus .
makes no sense here. It makes sense only for the KAFKA_
env vars which map to actual Kafka configuration properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed and refactored to fit the uniformity of the rest of the README.
README.md
Outdated
Below are listed and described environmental variables. | ||
Below are listed and described environmental variables that are NOT Kafka-specific but must be used. | ||
These environmental variables should be prefixed with `STRIMZI_` using `_` instead of `.` (eg. STRIMZI_TOPIC). | ||
Any Kafka configuration option can be given as an environmental variable and should be prefixed with `KAFKA_` using `_` instead of `.` (eg. bootstrap.servers -> KAFKA_BOOTSTRAP_SERVERS). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any Kafka configuration option can be given as an environmental variable and should be prefixed with `KAFKA_` using `_` instead of `.` (eg. bootstrap.servers -> KAFKA_BOOTSTRAP_SERVERS). | |
Any Kafka configuration option can be given as an environmental variable and should be prefixed with `KAFKA_` using `_` instead of `.` (eg. `bootstrap.servers` -> `KAFKA_BOOTSTRAP_SERVERS`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added back in
README.md
Outdated
Below are listed and described environmental variables that are NOT Kafka-specific but must be used. | ||
These environmental variables should be prefixed with `STRIMZI_` using `_` instead of `.` (eg. STRIMZI_TOPIC). | ||
Any Kafka configuration option can be given as an environmental variable and should be prefixed with `KAFKA_` using `_` instead of `.` (eg. bootstrap.servers -> KAFKA_BOOTSTRAP_SERVERS). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, does this currently apply to consumers only? It is fine to do it step by step. But you should make it clear here in the README. Right now it sounds like this applies for producers and Streams as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to specify Consumer only (until later updates)
public String getBootstrapServers() { | ||
return bootstrapServers; | ||
public static String convertEnvVarToPropertyKey(String envVar) { | ||
System.out.println("ENV_VAR is " + envVar); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use logger if you want to log this (doesn't seem needed to me, but why not) or be removed if it is some leftover from debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed altogether
README.md
Outdated
@@ -57,7 +57,10 @@ To run the OAuth example, you will need to have your Kafka cluster configured wi | |||
## Configuration | |||
|
|||
Although this Hello World is simple example it is fully configurable. | |||
Below are listed and described environmental variables. | |||
Below are listed and described environmental variables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we use "environment variable" everywhere in our docs. Can we change this way everywhere you are using "environmental"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.entrySet() | ||
.stream() | ||
.filter(map -> map.getKey().startsWith(KAFKA_PREFIX)) | ||
.collect(Collectors.toMap(map -> convertEnvVarToPropertyKey(map.getKey()), map -> map.getValue()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is map
appropriate as variable here? You are getting a Map.Entry
instance right? I would be more for naming it entry
or mapEntry
but not just map
which makes reader thinking about the whole map itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to use mapEntry
@@ -41,7 +43,7 @@ public static void main(String[] args) { | |||
} | |||
} | |||
|
|||
boolean commit = !Boolean.parseBoolean(config.getEnableAutoCommit()); | |||
boolean commit = !Boolean.parseBoolean(props.getProperty(ENABLE_AUTO_COMMIT_CONFIG)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong indentation, missing space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Too late I guess. But we should have first approved and merged the consumer config you had. This just makes it harder to review and fix things. |
return props; | ||
String topic = System.getenv("STRIMZI_TOPIC"); | ||
Long messageCount = System.getenv("STRIMZI_MESSAGE_COUNT") == null ? DEFAULT_MESSAGES_COUNT : Long.parseLong(System.getenv("STRIMZI_MESSAGE_COUNT")); | ||
int delay = System.getenv("STRIMZI_DELAY") == null ? DEFAULT_DELAY : Integer.parseInt(System.getenv("STRIMZI_DELAY")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int delay = System.getenv("STRIMZI_DELAY") == null ? DEFAULT_DELAY : Integer.parseInt(System.getenv("STRIMZI_DELAY")); | |
int delay = Integer.parseInt(System.getenv("STRIMZI_DELAY_MS")); |
|
||
public class KafkaProducerConfig { | ||
private static final Logger log = LogManager.getLogger(KafkaProducerConfig.class); | ||
|
||
private static final long DEFAULT_MESSAGES_COUNT = 10; | ||
private static final String DEFAULT_MESSAGE = "Hello world"; | ||
private final String bootstrapServers; | ||
private static final int DEFAULT_DELAY = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed
@@ -19,6 +19,7 @@ | |||
import java.util.concurrent.Future; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
boolean transactionalProducer = config.getAdditionalConfig().contains("transactional.id"); | ||
// boolean transactionalProducer = !Boolean.parseBoolean(props.getProperty(TRANSACTIONAL_ID_CONFIG)); | ||
boolean transactionalProducer = config.getProperties().contains("transactional.id"); | ||
// boolean transactionalProducer = config.getAdditionalConfig().contains("transactional.id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to clean up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted to before refactoring ConsumerConfig
@@ -46,8 +46,7 @@ public static void main(String[] args) { | |||
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(); | |||
streams = new KafkaStreams(builder.build(), props, supplier); | |||
} else { | |||
log.error("Error: TRACING_SYSTEM {} is not recognized or supported!", config.getTracingSystem()); | |||
streams = new KafkaStreams(builder.build(), props); | |||
throw new RuntimeException("Error: TRACING_SYSTEM " + tracingSystem + " is not recognized or supported!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert to original
@@ -46,13 +46,12 @@ public static void main(String[] args) { | |||
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(); | |||
streams = new KafkaStreams(builder.build(), props, supplier); | |||
} else { | |||
log.error("Error: TRACING_SYSTEM {} is not recognized or supported!", config.getTracingSystem()); | |||
streams = new KafkaStreams(builder.build(), props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add back in, revert to original
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
be1363d
to
8993420
Compare
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
Signed-off-by: Owen <owencorrigan76@gmail.com>
8993420
to
13836f0
Compare
@@ -54,4 +54,4 @@ public static void main(String[] args) { | |||
|
|||
streams.start(); | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undo
@OwenCorrigan76 can we move out of the draft state? |
Agree with @scholzj ... but it seems not to be late yet. There are no changes on the producer side yet :-) |
Signed-off-by: Owen <owencorrigan76@gmail.com>
@scholzj @ppatierno I had done some work on ProducerConfigs but I reverted everything back to before I changed ProducerConfigs. I guess I'll concentrate on Consumer first. Get it merged and then open a new PR for Producer. When that is merged, I'll open a separate PR for Streams. This means that any of Kyle's comments regarding ProducerConfigs above can be ignored. I know it's messy but i originally thought you guys would prefer Producer / Consumer and Streams done in the same PR (my mistake). |
|
||
public String getBootstrapServers() { | ||
return bootstrapServers; | ||
public static String convertEnvVarToPropertyKey(String envVar) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be private I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored
README.md
Outdated
@@ -57,7 +57,10 @@ To run the OAuth example, you will need to have your Kafka cluster configured wi | |||
## Configuration | |||
|
|||
Although this Hello World is simple example it is fully configurable. | |||
Below are listed and described environmental variables. | |||
Below are listed and described environment variables. | |||
Regarding Consumer Only (until later updates): below are listed and described environmental variables that are NOT Kafka-specific but must be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only with lowercase o
I guess? Also, is must be used
the right way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored
@@ -57,7 +57,10 @@ To run the OAuth example, you will need to have your Kafka cluster configured wi | |||
## Configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can do something like this (just the Configuration
section):
## Configuration
Below are listed and described the available environment variables that can be used for configuration.
### Producer
* `BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092`
* `TOPIC` - the topic the producer will send to
* `DELAY_MS` - the delay, in ms, between messages
* `MESSAGE_COUNT` - the number of messages the producer should send
* `CA_CRT` - the certificate of the CA which signed the brokers' TLS certificates, for adding to the client's trust store
* `USER_CRT` - the user's certificate
* `USER_KEY` - the user's private key
* `LOG_LEVEL` - logging level
* `PRODUCER_ACKS` - acknowledgement level
* `HEADERS` - custom headers list separated by commas of `key1=value1, key2=value2`
* `BLOCKING_PRODUCER` - if it's set, the producer will block another message until ack will be received
* `MESSAGES_PER_TRANSACTION` - how many messages will be part of one transaction. Transaction config could be set via `ADDITIONAL_CONFIG` variable. Default is 10.
* `ADDITIONAL_CONFIG` - additional configuration for a producer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character
* `TRACING_SYSTEM` - if it's set to `jaeger` or `opentelemetry`, this will enable tracing.
### Consumer
* `STRIMZI_TOPIC` - name of topic to which consumer subscribes
* `STRIMZI_MESSAGE_COUNT` - the number of messages the consumer should receive
* `STRIMZI_LOG_LEVEL` - logging level
* `STRIMZI_TRACING_SYSTEM` - if it's set to `jaeger` or `opentelemetry`, this will enable tracing.
Additionally, any Kafka Consumer API configuration option can be passed as an environmental variable.
It should be prefixed with `KAFKA_` and use `_` instead of `.`.
For example environment variable `KAFKA_BOOTSTRAP_SERVERS` will be used as the `bootstrap.servers` configuration option in the Kafka Consumer API.
### Streams
* `BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092`
* `APPLICATION_ID` - The Kafka Streams application ID
* `SOURCE_TOPIC` - name of topic which will be used as the source of messages
* `TARGET_TOPIC` - name of topic where the transformed images are sent
* `COMMIT_INTERVAL_MS` - the interval for the Kafka Streams consumer part committing the offsets
* `CA_CRT` - the certificate of the CA which signed the brokers' TLS certificates, for adding to the client's trust store
* `USER_CRT` - the user's certificate
* `USER_KEY` - the user's private key
* `LOG_LEVEL` - logging level
* `ADDITIONAL_CONFIG` - additional configuration for a streams application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character.
* `TRACING_SYSTEM` - if it's set to `jaeger` or `opentelemetry`, this will enable tracing.
### Tracing
The examples support tracing using the [OpenTracing Apache Kafka Instrumentation](https://github.com/opentracing-contrib/java-kafka-client),
[OpenTelemetry Java Instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation) and the [Jaeger project](https://www.jaegertracing.io/).
To enable tracing, configure the Jaeger Tracer using [environment variables](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment).
To run Jaeger Tracing, you can also use the provided example in [`deployment-tracing-jaeger.yaml`](./java/kafka/deployment-tracing-jaeger.yaml).
To run OpenTelemetry Tracing, you can also use the provided example in [`deployment-tracing-opentelemetry.yaml`](./java/kafka/deployment-tracing-opentelemetry.yaml).
Jaeger / OpenTracing tracing is supported only in consumers / producers because OpenTracing support for Kafka Streams API is not compatible with the latest Kafka versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Refactored to use this suggestion.
Signed-off-by: Owen <owencorrigan76@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
* for refactoring Signed-off-by: Owen <owencorrigan76@gmail.com> * refactored ConsumerConfigs Signed-off-by: Owen <owencorrigan76@gmail.com> * refactored ConsumerConfigs Signed-off-by: Owen <owencorrigan76@gmail.com> * removed Ssl and Oauth configs Signed-off-by: Owen <owencorrigan76@gmail.com> * Addressing Comments Signed-off-by: Owen <owencorrigan76@gmail.com> * Indentation in jaeger.yaml Signed-off-by: Owen <owencorrigan76@gmail.com> * Addressing Kyle's Comments Signed-off-by: Owen <owencorrigan76@gmail.com> * Addressing Jakub's Comments and changing Repo name Signed-off-by: Owen <owencorrigan76@gmail.com> * Addressing Paolo's Comments Signed-off-by: Owen <owencorrigan76@gmail.com> * Addressing Jakub and Paolo's Comments Signed-off-by: Owen <owencorrigan76@gmail.com> * Further Addressing Jakub's Comments Signed-off-by: Owen <owencorrigan76@gmail.com> Signed-off-by: Owen <owencorrigan76@gmail.com>
Refactor the Producer, Consumer and Streams Config files in the Java client by reducing the complexity and size of the class. Standardise the naming scheme of environment variables used to configure the clients / standardise the envVars used to configure the Kafka client properties as outlined here [1].
Example of standardised Environmental Variables:
[1] https://github.com/kyguy/proposals/blob/refactor-client-examples/040-refactor-client-examples.md