Skip to content

Commit 3b801e4

Browse files
authored
Parse additional config at the end of props setup to allow rewrite configurations (#55)
* UPD: Various client updates Main goal of my changes is to get rid of sasl.login.callback.handler.class, as it was used there hardwired. Using this additional configuration, sasl.mechanism can be overridden as well (hardwired to OAUTHBEARER now) Minor updates: - some clean up code - simplify statements - added readme docs for this new env variable Signed-off-by: Michal Tóth <mtoth@redhat.com> * UPD: Remove custom SASL_CALLBACK_CLASS env var - pass it via additional Config Signed-off-by: Michal Tóth <mtoth@redhat.com>
1 parent 6042b71 commit 3b801e4

File tree

5 files changed

+117
-81
lines changed

5 files changed

+117
-81
lines changed

README.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ Producer
6363
* `USER_CRT` - the user's certificate
6464
* `USER_KEY` - the user's private key
6565
* `LOG_LEVEL` - logging level
66-
* `PRODUCER_ACKS` = acknowledgement level
67-
* `HEADERS` = custom headers list separated by commas of `key1=value1, key2=value2`
68-
* `ADDITIONAL_CONFIG` = additional configuration for a producer application. The form is `key=value` records separated by new line character
69-
* `BLOCKING_PRODUCER` = if it's set, the producer will block another message until ack will be received
70-
* `MESSAGES_PER_TRANSACTION` = how many messages will be part of one transaction. Transaction config could be set via `ADDITIONAL_CONFIG` variable. Default is 10.
66+
* `PRODUCER_ACKS` - acknowledgement level
67+
* `HEADERS` - custom headers list separated by commas of `key1=value1, key2=value2`
68+
* `BLOCKING_PRODUCER` - if it's set, the producer will block another message until ack will be received
69+
* `MESSAGES_PER_TRANSACTION` - how many messages will be part of one transaction. Transaction config could be set via `ADDITIONAL_CONFIG` variable. Default is 10.
70+
* `ADDITIONAL_CONFIG` - additional configuration for a producer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character
7171

7272
Consumer
7373
* `BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092`
@@ -78,7 +78,7 @@ Consumer
7878
* `USER_CRT` - the user's certificate
7979
* `USER_KEY` - the user's private key
8080
* `LOG_LEVEL` - logging level
81-
* `ADDITIONAL_CONFIG` = additional configuration for a consumer application. The form is `key=value` records separated by new line character
81+
* `ADDITIONAL_CONFIG` - additional configuration for a consumer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character
8282

8383
Streams
8484
* `BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092`
@@ -90,7 +90,7 @@ Streams
9090
* `USER_CRT` - the user's certificate
9191
* `USER_KEY` - the user's private key
9292
* `LOG_LEVEL` - logging level
93-
* `ADDITIONAL_CONFIG` = additional configuration for a streams application. The form is `key=value` records separated by new line character
93+
* `ADDITIONAL_CONFIG` - additional configuration for a streams application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character.
9494

9595
### Tracing
9696

java/kafka/consumer/src/main/java/KafkaConsumerConfig.java

+38-22
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,14 @@ public class KafkaConsumerConfig {
3434
private final String oauthRefreshToken;
3535
private final String oauthTokenEndpointUri;
3636
private final String additionalConfig;
37+
private final String saslLoginCallbackClass = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
38+
;
3739

38-
public KafkaConsumerConfig(String bootstrapServers, String topic, String groupId, String clientRack, Long messageCount, String trustStorePassword, String trustStorePath, String keyStorePassword, String keyStorePath, String oauthClientId, String oauthClientSecret, String oauthAccessToken, String oauthRefreshToken, String oauthTokenEndpointUri, String additionalConfig) {
40+
41+
public KafkaConsumerConfig(String bootstrapServers, String topic, String groupId, String clientRack, Long messageCount,
42+
String trustStorePassword, String trustStorePath, String keyStorePassword, String keyStorePath,
43+
String oauthClientId, String oauthClientSecret, String oauthAccessToken, String oauthRefreshToken,
44+
String oauthTokenEndpointUri, String additionalConfig) {
3945
this.bootstrapServers = bootstrapServers;
4046
this.topic = topic;
4147
this.groupId = groupId;
@@ -51,26 +57,29 @@ public KafkaConsumerConfig(String bootstrapServers, String topic, String groupId
5157
this.oauthRefreshToken = oauthRefreshToken;
5258
this.oauthTokenEndpointUri = oauthTokenEndpointUri;
5359
this.additionalConfig = additionalConfig;
60+
;
5461
}
5562

5663
public static KafkaConsumerConfig fromEnv() {
5764
String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS");
5865
String topic = System.getenv("TOPIC");
5966
String groupId = System.getenv("GROUP_ID");
60-
String clientRack = System.getenv("CLIENT_RACK") == null ? null : System.getenv("CLIENT_RACK");
67+
String clientRack = System.getenv("CLIENT_RACK");
6168
Long messageCount = System.getenv("MESSAGE_COUNT") == null ? DEFAULT_MESSAGES_COUNT : Long.valueOf(System.getenv("MESSAGE_COUNT"));
62-
String trustStorePassword = System.getenv("TRUSTSTORE_PASSWORD") == null ? null : System.getenv("TRUSTSTORE_PASSWORD");
63-
String trustStorePath = System.getenv("TRUSTSTORE_PATH") == null ? null : System.getenv("TRUSTSTORE_PATH");
64-
String keyStorePassword = System.getenv("KEYSTORE_PASSWORD") == null ? null : System.getenv("KEYSTORE_PASSWORD");
65-
String keyStorePath = System.getenv("KEYSTORE_PATH") == null ? null : System.getenv("KEYSTORE_PATH");
69+
String trustStorePassword = System.getenv("TRUSTSTORE_PASSWORD");
70+
String trustStorePath = System.getenv("TRUSTSTORE_PATH");
71+
String keyStorePassword = System.getenv("KEYSTORE_PASSWORD");
72+
String keyStorePath = System.getenv("KEYSTORE_PATH");
6673
String oauthClientId = System.getenv("OAUTH_CLIENT_ID");
6774
String oauthClientSecret = System.getenv("OAUTH_CLIENT_SECRET");
6875
String oauthAccessToken = System.getenv("OAUTH_ACCESS_TOKEN");
6976
String oauthRefreshToken = System.getenv("OAUTH_REFRESH_TOKEN");
7077
String oauthTokenEndpointUri = System.getenv("OAUTH_TOKEN_ENDPOINT_URI");
7178
String additionalConfig = System.getenv().getOrDefault("ADDITIONAL_CONFIG", "");
7279

73-
return new KafkaConsumerConfig(bootstrapServers, topic, groupId, clientRack, messageCount, trustStorePassword, trustStorePath, keyStorePassword, keyStorePath, oauthClientId, oauthClientSecret, oauthAccessToken, oauthRefreshToken, oauthTokenEndpointUri, additionalConfig);
80+
return new KafkaConsumerConfig(bootstrapServers, topic, groupId, clientRack, messageCount, trustStorePassword, trustStorePath,
81+
keyStorePassword, keyStorePath, oauthClientId, oauthClientSecret, oauthAccessToken, oauthRefreshToken, oauthTokenEndpointUri,
82+
additionalConfig);
7483
}
7584

7685
public static Properties createProperties(KafkaConsumerConfig config) {
@@ -85,20 +94,6 @@ public static Properties createProperties(KafkaConsumerConfig config) {
8594
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
8695
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
8796

88-
if (!config.getAdditionalConfig().isEmpty()) {
89-
StringTokenizer tok = new StringTokenizer(config.getAdditionalConfig(), ", \t\n\r");
90-
while (tok.hasMoreTokens()) {
91-
String record = tok.nextToken();
92-
int endIndex = record.indexOf('=');
93-
if (endIndex == -1) {
94-
throw new RuntimeException("Failed to parse Map from String");
95-
}
96-
String key = record.substring(0, endIndex);
97-
String value = record.substring(endIndex + 1);
98-
props.put(key.trim(), value.trim());
99-
}
100-
}
101-
10297
if (config.getTrustStorePassword() != null && config.getTrustStorePath() != null) {
10398
log.info("Configuring truststore");
10499
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
@@ -115,15 +110,36 @@ public static Properties createProperties(KafkaConsumerConfig config) {
115110
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, config.getKeyStorePath());
116111
}
117112

113+
Properties additionalProps = new Properties();
114+
if (!config.getAdditionalConfig().isEmpty()) {
115+
StringTokenizer tok = new StringTokenizer(config.getAdditionalConfig(), System.lineSeparator());
116+
while (tok.hasMoreTokens()) {
117+
String record = tok.nextToken();
118+
int endIndex = record.indexOf('=');
119+
if (endIndex == -1) {
120+
throw new RuntimeException("Failed to parse Map from String");
121+
}
122+
String key = record.substring(0, endIndex);
123+
String value = record.substring(endIndex + 1);
124+
additionalProps.put(key.trim(), value.trim());
125+
}
126+
}
127+
118128
if ((config.getOauthAccessToken() != null)
119129
|| (config.getOauthTokenEndpointUri() != null && config.getOauthClientId() != null && config.getOauthRefreshToken() != null)
120130
|| (config.getOauthTokenEndpointUri() != null && config.getOauthClientId() != null && config.getOauthClientSecret() != null)) {
131+
log.info("Configuring OAuth");
121132
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
122133
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL".equals(props.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) ? "SASL_SSL" : "SASL_PLAINTEXT");
123134
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
124-
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
135+
if (!(additionalProps.containsKey(SaslConfigs.SASL_MECHANISM) && additionalProps.getProperty(SaslConfigs.SASL_MECHANISM).equals("PLAIN"))) {
136+
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, config.saslLoginCallbackClass);
137+
}
125138
}
126139

140+
// override properties with defined additional properties
141+
props.putAll(additionalProps);
142+
127143
return props;
128144
}
129145

java/kafka/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<kafka.version>2.7.0</kafka.version>
2222
<opentracing-kafka.version>0.1.11</opentracing-kafka.version>
2323
<jaeger.version>1.1.0</jaeger.version>
24-
<strimzi-oauth-callback.version>0.6.1</strimzi-oauth-callback.version>
24+
<strimzi-oauth-callback.version>0.7.1</strimzi-oauth-callback.version>
2525
</properties>
2626

2727
<dependencyManagement>

java/kafka/producer/src/main/java/KafkaProducerConfig.java

+35-22
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,12 @@ public class KafkaProducerConfig {
3535
private final String oauthRefreshToken;
3636
private final String oauthTokenEndpointUri;
3737
private final String additionalConfig;
38+
private final String saslLoginCallbackClass = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
3839

39-
public KafkaProducerConfig(String bootstrapServers, String topic, int delay, Long messageCount, String message, String trustStorePassword, String trustStorePath, String keyStorePassword, String keyStorePath, String oauthClientId, String oauthClientSecret, String oauthAccessToken, String oauthRefreshToken, String oauthTokenEndpointUri, String acks, String additionalConfig, String headers) {
40+
public KafkaProducerConfig(String bootstrapServers, String topic, int delay, Long messageCount, String message,
41+
String trustStorePassword, String trustStorePath, String keyStorePassword, String keyStorePath,
42+
String oauthClientId, String oauthClientSecret, String oauthAccessToken, String oauthRefreshToken,
43+
String oauthTokenEndpointUri, String acks, String additionalConfig, String headers) {
4044
this.bootstrapServers = bootstrapServers;
4145
this.topic = topic;
4246
this.delay = delay;
@@ -59,13 +63,13 @@ public KafkaProducerConfig(String bootstrapServers, String topic, int delay, Lon
5963
public static KafkaProducerConfig fromEnv() {
6064
String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS");
6165
String topic = System.getenv("TOPIC");
62-
int delay = Integer.valueOf(System.getenv("DELAY_MS"));
66+
int delay = Integer.parseInt(System.getenv("DELAY_MS"));
6367
Long messageCount = System.getenv("MESSAGE_COUNT") == null ? DEFAULT_MESSAGES_COUNT : Long.valueOf(System.getenv("MESSAGE_COUNT"));
6468
String message = System.getenv("MESSAGE") == null ? DEFAULT_MESSAGE : System.getenv("MESSAGE");
65-
String trustStorePassword = System.getenv("TRUSTSTORE_PASSWORD") == null ? null : System.getenv("TRUSTSTORE_PASSWORD");
66-
String trustStorePath = System.getenv("TRUSTSTORE_PATH") == null ? null : System.getenv("TRUSTSTORE_PATH");
67-
String keyStorePassword = System.getenv("KEYSTORE_PASSWORD") == null ? null : System.getenv("KEYSTORE_PASSWORD");
68-
String keyStorePath = System.getenv("KEYSTORE_PATH") == null ? null : System.getenv("KEYSTORE_PATH");
69+
String trustStorePassword = System.getenv("TRUSTSTORE_PASSWORD");
70+
String trustStorePath = System.getenv("TRUSTSTORE_PATH");
71+
String keyStorePassword = System.getenv("KEYSTORE_PASSWORD");
72+
String keyStorePath = System.getenv("KEYSTORE_PATH");
6973
String oauthClientId = System.getenv("OAUTH_CLIENT_ID");
7074
String oauthClientSecret = System.getenv("OAUTH_CLIENT_SECRET");
7175
String oauthAccessToken = System.getenv("OAUTH_ACCESS_TOKEN");
@@ -75,7 +79,9 @@ public static KafkaProducerConfig fromEnv() {
7579
String headers = System.getenv("HEADERS");
7680
String additionalConfig = System.getenv().getOrDefault("ADDITIONAL_CONFIG", "");
7781

78-
return new KafkaProducerConfig(bootstrapServers, topic, delay, messageCount, message, trustStorePassword, trustStorePath, keyStorePassword, keyStorePath, oauthClientId, oauthClientSecret, oauthAccessToken, oauthRefreshToken, oauthTokenEndpointUri, acks, additionalConfig, headers);
82+
return new KafkaProducerConfig(bootstrapServers, topic, delay, messageCount, message, trustStorePassword, trustStorePath,
83+
keyStorePassword, keyStorePath, oauthClientId, oauthClientSecret, oauthAccessToken, oauthRefreshToken,
84+
oauthTokenEndpointUri, acks, additionalConfig, headers);
7985
}
8086

8187
public static Properties createProperties(KafkaProducerConfig config) {
@@ -85,20 +91,6 @@ public static Properties createProperties(KafkaProducerConfig config) {
8591
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
8692
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
8793

88-
if (!config.getAdditionalConfig().isEmpty()) {
89-
StringTokenizer tok = new StringTokenizer(config.getAdditionalConfig(), ", \t\n\r");
90-
while (tok.hasMoreTokens()) {
91-
String record = tok.nextToken();
92-
int endIndex = record.indexOf('=');
93-
if (endIndex == -1) {
94-
throw new RuntimeException("Failed to parse Map from String");
95-
}
96-
String key = record.substring(0, endIndex);
97-
String value = record.substring(endIndex + 1);
98-
props.put(key.trim(), value.trim());
99-
}
100-
}
101-
10294
if (config.getTrustStorePassword() != null && config.getTrustStorePath() != null) {
10395
log.info("Configuring truststore");
10496
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
@@ -115,15 +107,36 @@ public static Properties createProperties(KafkaProducerConfig config) {
115107
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, config.getKeyStorePath());
116108
}
117109

110+
Properties additionalProps = new Properties();
111+
if (!config.getAdditionalConfig().isEmpty()) {
112+
StringTokenizer tok = new StringTokenizer(config.getAdditionalConfig(), System.lineSeparator());
113+
while (tok.hasMoreTokens()) {
114+
String record = tok.nextToken();
115+
int endIndex = record.indexOf('=');
116+
if (endIndex == -1) {
117+
throw new RuntimeException("Failed to parse Map from String");
118+
}
119+
String key = record.substring(0, endIndex);
120+
String value = record.substring(endIndex + 1);
121+
additionalProps.put(key.trim(), value.trim());
122+
}
123+
}
124+
118125
if ((config.getOauthAccessToken() != null)
119126
|| (config.getOauthTokenEndpointUri() != null && config.getOauthClientId() != null && config.getOauthRefreshToken() != null)
120127
|| (config.getOauthTokenEndpointUri() != null && config.getOauthClientId() != null && config.getOauthClientSecret() != null)) {
128+
log.info("Configuring OAuth");
121129
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
122130
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL".equals(props.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) ? "SASL_SSL" : "SASL_PLAINTEXT");
123131
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
124-
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
132+
if (!(additionalProps.containsKey(SaslConfigs.SASL_MECHANISM) && additionalProps.getProperty(SaslConfigs.SASL_MECHANISM).equals("PLAIN"))) {
133+
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, config.saslLoginCallbackClass);
134+
}
125135
}
126136

137+
// override properties with defined additional properties
138+
props.putAll(additionalProps);
139+
127140
return props;
128141
}
129142
public String getBootstrapServers() {

0 commit comments

Comments
 (0)