Skip to content

Commit 205d594

Browse files
Added support for opentelemetry (#83)
* Add OpenTelemetry support Signed-off-by: Owen <owencorrigan76@gmail.com>
1 parent baf0a64 commit 205d594

19 files changed

+471
-37
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
RELEASE_VERSION ?= latest
22

3-
SUBDIRS=java/kafka/consumer java/kafka/producer java/kafka/streams http/vertx/java-http-vertx-consumer http/vertx/java-http-vertx-producer
3+
SUBDIRS=java/kafka/common java/kafka/consumer java/kafka/producer java/kafka/streams http/vertx/java-http-vertx-consumer http/vertx/java-http-vertx-producer
44
DOCKER_TARGETS=docker_build docker_push docker_tag
55

66
all: $(SUBDIRS)

README.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ Producer
7373
* `BLOCKING_PRODUCER` - if it's set, the producer will block another message until ack will be received
7474
* `MESSAGES_PER_TRANSACTION` - how many messages will be part of one transaction. Transaction config could be set via `ADDITIONAL_CONFIG` variable. Default is 10.
7575
* `ADDITIONAL_CONFIG` - additional configuration for a producer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character
76+
* `TRACING_SYSTEM` - if it's set to `jaeger` or `opentelemetry`, this will enable tracing.
7677

7778
Consumer
7879
* `BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092`
@@ -84,6 +85,7 @@ Consumer
8485
* `USER_KEY` - the user's private key
8586
* `LOG_LEVEL` - logging level
8687
* `ADDITIONAL_CONFIG` - additional configuration for a consumer application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character
88+
* `TRACING_SYSTEM` - if it's set to `jaeger` or `opentelemetry`, this will enable tracing.
8789

8890
Streams
8991
* `BOOTSTRAP_SERVERS` - comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair is `host:port`, e.g. `my-cluster-kafka-bootstrap:9092`
@@ -96,10 +98,13 @@ Streams
9698
* `USER_KEY` - the user's private key
9799
* `LOG_LEVEL` - logging level
98100
* `ADDITIONAL_CONFIG` - additional configuration for a streams application. Notice, that you can also override any previously set variable by setting this. The form is `key=value` records separated by new line character.
101+
* `TRACING_SYSTEM` - if it's set to `jaeger` or `opentelemetry`, this will enable tracing.
99102

100103
### Tracing
101104

102-
The examples support tracing using the [OpenTracing Apache Kafka Instrumentation](https://github.com/opentracing-contrib/java-kafka-client) and the [Jaeger project](https://www.jaegertracing.io/).
105+
The examples support tracing using the [OpenTracing Apache Kafka Instrumentation](https://github.com/opentracing-contrib/java-kafka-client),
106+
[OpenTelemetry Java Instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation) and the [Jaeger project](https://www.jaegertracing.io/).
103107
To enable tracing, configure the Jaeger Tracer using [environment variables](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment).
104108

105-
You can also use the provided example in [`deployment-tracing.yaml`](./java/kafka/deployment-tracing.yaml).
109+
To run Jaeger Tracing, you can also use the provided example in [`deployment-tracing-jaeger.yaml`](./java/kafka/deployment-tracing-jaeger.yaml).
110+
To run Opentelemetry Tracing, you can also use the provided example in [`deployment-tracing-opentelemetry.yaml`](./java/kafka/deployment-tracing-opentelemetry.yaml).

java/kafka/common/Makefile

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
PROJECT_NAME=java-kafka-common
2+
3+
all: java_install
4+
build: java_install
5+
clean: java_clean
6+
7+
docker_build:
8+
9+
docker_tag:
10+
11+
docker_push:
12+
13+
java_install:
14+
echo "Installing root pom ..."
15+
mvn install -f ../../../pom.xml
16+
17+
include ../../../Makefile.maven
18+
19+
.PHONY: all build clean

java/kafka/common/pom.xml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<dependencies>
7+
<dependency>
8+
<groupId>io.opentracing</groupId>
9+
<artifactId>opentracing-util</artifactId>
10+
<version>0.33.0</version>
11+
<scope>compile</scope>
12+
</dependency>
13+
<dependency>
14+
<groupId>io.jaegertracing</groupId>
15+
<artifactId>jaeger-core</artifactId>
16+
<version>1.1.0</version>
17+
</dependency>
18+
<dependency>
19+
<groupId>io.opentelemetry</groupId>
20+
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.apache.logging.log4j</groupId>
24+
<artifactId>log4j-api</artifactId>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.slf4j</groupId>
28+
<artifactId>slf4j-api</artifactId>
29+
<version>1.7.30</version>
30+
<scope>compile</scope>
31+
</dependency>
32+
</dependencies>
33+
34+
<parent>
35+
<groupId>strimzi.io</groupId>
36+
<artifactId>java-kafka-examples</artifactId>
37+
<version>1.0-SNAPSHOT</version>
38+
</parent>
39+
<artifactId>common</artifactId>
40+
41+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
6+
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
7+
import io.opentracing.Tracer;
8+
import io.opentracing.util.GlobalTracer;
9+
import io.jaegertracing.Configuration;
10+
11+
public class TracingInitializer {
12+
13+
public static Tracer jaegerInitialize() {
14+
Tracer tracer = Configuration.fromEnv().getTracer();
15+
GlobalTracer.registerIfAbsent(tracer);
16+
return tracer;
17+
}
18+
19+
public static void otelInitialize() {
20+
AutoConfiguredOpenTelemetrySdk.initialize();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
6+
public enum TracingSystem {
7+
JAEGER,
8+
OPENTELEMETRY,
9+
NONE;
10+
11+
public static TracingSystem forValue(String value) {
12+
switch (value) {
13+
case "jaeger":
14+
return TracingSystem.JAEGER;
15+
case "opentelemetry":
16+
return TracingSystem.OPENTELEMETRY;
17+
default:
18+
return TracingSystem.NONE;
19+
}
20+
}
21+
}

java/kafka/consumer/pom.xml

+20
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
<artifactId>java-kafka-consumer</artifactId>
1313

1414
<dependencies>
15+
<dependency>
16+
<groupId>strimzi.io</groupId>
17+
<artifactId>common</artifactId>
18+
<version>${project.version}</version>
19+
</dependency>
1520
<dependency>
1621
<groupId>org.apache.kafka</groupId>
1722
<artifactId>kafka-clients</artifactId>
@@ -39,6 +44,21 @@
3944
<groupId>io.opentracing.contrib</groupId>
4045
<artifactId>opentracing-kafka-client</artifactId>
4146
</dependency>
47+
<dependency>
48+
<groupId>io.opentelemetry</groupId>
49+
<artifactId>opentelemetry-exporter-jaeger</artifactId>
50+
<version>${opentelemetry.version}</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>io.opentelemetry.instrumentation</groupId>
54+
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
55+
<version>${opentelemetry-alpha.version}</version>
56+
</dependency>
57+
<dependency>
58+
<groupId>io.opentelemetry</groupId>
59+
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
60+
<version>${opentelemetry-alpha.version}</version>
61+
</dependency>
4262
<dependency>
4363
<groupId>io.strimzi</groupId>
4464
<artifactId>kafka-oauth-client</artifactId>

java/kafka/consumer/src/main/java/KafkaConsumerConfig.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ public class KafkaConsumerConfig {
3434
private final String oauthTokenEndpointUri;
3535
private final String additionalConfig;
3636
private final String saslLoginCallbackClass = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
37-
37+
private final TracingSystem tracingSystem;
3838

3939
public KafkaConsumerConfig(String bootstrapServers, String topic, String groupId, String clientRack, Long messageCount,
4040
String sslTruststoreCertificates, String sslKeystoreKey, String sslKeystoreCertificateChain,
4141
String oauthClientId, String oauthClientSecret, String oauthAccessToken, String oauthRefreshToken,
42-
String oauthTokenEndpointUri, String additionalConfig) {
42+
String oauthTokenEndpointUri, String additionalConfig, TracingSystem tracingSystem) {
4343
this.bootstrapServers = bootstrapServers;
4444
this.topic = topic;
4545
this.groupId = groupId;
@@ -54,6 +54,7 @@ public KafkaConsumerConfig(String bootstrapServers, String topic, String groupId
5454
this.oauthRefreshToken = oauthRefreshToken;
5555
this.oauthTokenEndpointUri = oauthTokenEndpointUri;
5656
this.additionalConfig = additionalConfig;
57+
this.tracingSystem = tracingSystem;
5758
}
5859

5960
public static KafkaConsumerConfig fromEnv() {
@@ -71,10 +72,11 @@ public static KafkaConsumerConfig fromEnv() {
7172
String oauthRefreshToken = System.getenv("OAUTH_REFRESH_TOKEN");
7273
String oauthTokenEndpointUri = System.getenv("OAUTH_TOKEN_ENDPOINT_URI");
7374
String additionalConfig = System.getenv().getOrDefault("ADDITIONAL_CONFIG", "");
75+
TracingSystem tracingSystem = TracingSystem.forValue(System.getenv("TRACING_SYSTEM"));
7476

7577
return new KafkaConsumerConfig(bootstrapServers, topic, groupId, clientRack, messageCount, sslTruststoreCertificates, sslKeystoreKey,
7678
sslKeystoreCertificateChain, oauthClientId, oauthClientSecret, oauthAccessToken, oauthRefreshToken, oauthTokenEndpointUri,
77-
additionalConfig);
79+
additionalConfig, tracingSystem);
7880
}
7981

8082
public static Properties createProperties(KafkaConsumerConfig config) {
@@ -201,6 +203,8 @@ public String getAdditionalConfig() {
201203
return additionalConfig;
202204
}
203205

206+
public TracingSystem getTracingSystem() { return tracingSystem; }
207+
204208
@Override
205209
public String toString() {
206210
return "KafkaConsumerConfig{" +
@@ -220,6 +224,7 @@ public String toString() {
220224
", oauthRefreshToken='" + oauthRefreshToken + '\'' +
221225
", oauthTokenEndpointUri='" + oauthTokenEndpointUri + '\'' +
222226
", additionalConfig='" + additionalConfig + '\'' +
227+
", tracingSystem='" + tracingSystem + '\'' +
223228
'}';
224229
}
225230
}

java/kafka/consumer/src/main/java/KafkaConsumerExample.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
44
*/
55

6-
import io.jaegertracing.Configuration;
7-
import io.opentracing.Tracer;
8-
import io.opentracing.contrib.kafka.TracingConsumerInterceptor;
9-
import io.opentracing.util.GlobalTracer;
106
import org.apache.kafka.clients.consumer.ConsumerConfig;
117
import org.apache.kafka.clients.consumer.ConsumerRecord;
128
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -30,11 +26,19 @@ public static void main(String[] args) {
3026
Properties props = KafkaConsumerConfig.createProperties(config);
3127
int receivedMsgs = 0;
3228

33-
if (System.getenv("JAEGER_SERVICE_NAME") != null) {
34-
Tracer tracer = Configuration.fromEnv().getTracer();
35-
GlobalTracer.registerIfAbsent(tracer);
29+
TracingSystem tracingSystem = config.getTracingSystem();
30+
if (tracingSystem != TracingSystem.NONE) {
31+
if (tracingSystem == TracingSystem.JAEGER) {
32+
TracingInitializer.jaegerInitialize();
3633

37-
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
34+
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, io.opentracing.contrib.kafka.TracingConsumerInterceptor.class.getName());
35+
} else if (tracingSystem == TracingSystem.OPENTELEMETRY) {
36+
TracingInitializer.otelInitialize();
37+
38+
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor.class.getName());
39+
} else {
40+
log.error("Error: TRACING_SYSTEM {} is not recognized or supported!", tracingSystem);
41+
}
3842
}
3943

4044
boolean commit = !Boolean.parseBoolean(config.getEnableAutoCommit());

java/kafka/deployment-tracing.yaml java/kafka/deployment-tracing-jaeger.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ spec:
5656
value: const
5757
- name: JAEGER_SAMPLER_PARAM
5858
value: "1"
59+
- name: TRACING_SYSTEM
60+
value: jaeger
5961
---
6062
apiVersion: apps/v1
6163
kind: Deployment
@@ -95,6 +97,8 @@ spec:
9597
value: const
9698
- name: JAEGER_SAMPLER_PARAM
9799
value: "1"
100+
- name: TRACING_SYSTEM
101+
value: jaeger
98102
---
99103
apiVersion: apps/v1
100104
kind: Deployment
@@ -134,3 +138,5 @@ spec:
134138
value: const
135139
- name: JAEGER_SAMPLER_PARAM
136140
value: "1"
141+
- name: TRACING_SYSTEM
142+
value: jaeger

0 commit comments

Comments
 (0)