diff --git a/instrumentation/grpc/README.md b/instrumentation/grpc/README.md index bf5788d3c5..2119f44104 100644 --- a/instrumentation/grpc/README.md +++ b/instrumentation/grpc/README.md @@ -60,6 +60,28 @@ overrideSpanName = new GrpcClientParser() { }; ``` +## gRPC Propagation Format (Census interop) + +gRPC defines a [binary encoded propagation format](https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding.md) which is implemented +by [OpenCensus](https://opencensus.io/) instrumentation. When this is +the case, incoming requests will have two metadata keys "grpc-trace-bin" +and "grpc-tags-bin". + +When enabled, this component can extract trace contexts from these +metadata and also write the same keys on outgoing calls. This allows +transparent interop when both census and brave report data to the same +tracing system. + +To enable this feature, set `grpcPropagationFormatEnabled` which is off +by default: +```java +grpcTracing = GrpcTracing.newBuilder(tracing) + .grpcPropagationFormatEnabled(true).build(); +``` + +Warning: the format of both "grpc-trace-bin" and "grpc-tags-bin" are +version 0. As such, consider this feature experimental. + ## Development If you are working on this module, then you need to run `mvn install` to first compile the protos. Once the protos are compiled, then can be found in the directories: diff --git a/instrumentation/grpc/pom.xml b/instrumentation/grpc/pom.xml index e95b54e37c..955b88dcd1 100644 --- a/instrumentation/grpc/pom.xml +++ b/instrumentation/grpc/pom.xml @@ -1,4 +1,6 @@ - + io.zipkin.brave brave-instrumentation-parent @@ -12,6 +14,7 @@ ${project.basedir}/../.. 3.5.1 + 0.14.0 1.5.0.Final 0.5.1 @@ -23,11 +26,6 @@ ${grpc.version} provided - - com.google.auto.value - auto-value - provided - javax.annotation @@ -44,6 +42,18 @@ brave-context-log4j2 test + + io.opencensus + opencensus-impl-lite + ${opencensus.version} + test + + + io.opencensus + opencensus-testing + ${opencensus.version} + test + @@ -94,7 +104,8 @@ io.grpc:grpc-all:1.2.0:jar - com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier} + com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier} + diff --git a/instrumentation/grpc/src/it/grpc12/pom.xml b/instrumentation/grpc/src/it/grpc12/pom.xml index 7305855f0a..f934eab3a2 100644 --- a/instrumentation/grpc/src/it/grpc12/pom.xml +++ b/instrumentation/grpc/src/it/grpc12/pom.xml @@ -64,6 +64,16 @@ + + maven-compiler-plugin + @maven-compiler-plugin.version@ + + + + **/ITCensusInterop*.java + + + maven-failsafe-plugin @maven-failsafe-plugin.version@ diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java new file mode 100644 index 0000000000..54f4a3b8a5 --- /dev/null +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java @@ -0,0 +1,191 @@ +package brave.grpc; + +import brave.internal.MapPropagationFields; +import brave.internal.Nullable; +import brave.internal.PropagationFieldsFactory; +import brave.propagation.Propagation; +import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Extractor; +import brave.propagation.TraceContext.Injector; +import brave.propagation.TraceContextOrSamplingFlags; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import java.util.List; +import java.util.Map; + +/** see {@link GrpcTracing.Builder#grpcPropagationFormatEnabled} for documentation. */ +final class GrpcPropagation implements Propagation { + + /** + * This creates a compatible metadata key based on Census, except this extracts a brave trace + * context as opposed to a census span context + */ + static final Metadata.Key GRPC_TRACE_BIN = + Metadata.Key.of("grpc-trace-bin", new TraceContextBinaryMarshaller()); + + /** This stashes the tag context in "extra" so it isn't lost */ + static final Metadata.Key> GRPC_TAGS_BIN = + Metadata.Key.of("grpc-tags-bin", new TagContextBinaryMarshaller()); + + /** The census tag key corresponding to the {@link MethodDescriptor#fullMethodName}. */ + static final String RPC_METHOD = "method"; + + static Propagation.Factory newFactory(Propagation.Factory delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + return new Factory(delegate); + } + + static final class Factory extends Propagation.Factory { + final Propagation.Factory delegate; + final TagsFactory tagsFactory = new TagsFactory(); + + Factory(Propagation.Factory delegate) { + this.delegate = delegate; + } + + @Override + public boolean supportsJoin() { + return false; + } + + @Override + public boolean requires128BitTraceId() { + return true; + } + + @Override + public final Propagation create(KeyFactory keyFactory) { + return new GrpcPropagation<>(this, keyFactory); + } + + @Override + public TraceContext decorate(TraceContext context) { + TraceContext result = delegate.decorate(context); + return tagsFactory.decorate(result); + } + } + + final Propagation delegate; + final TagsFactory extraFactory; + + GrpcPropagation(Factory factory, KeyFactory keyFactory) { + this.delegate = factory.delegate.create(keyFactory); + this.extraFactory = factory.tagsFactory; + } + + @Override + public List keys() { + return delegate.keys(); + } + + @Override + public Injector injector(Setter setter) { + return new GrpcInjector<>(this, setter); + } + + @Override + public Extractor extractor(Getter getter) { + return new GrpcExtractor<>(this, getter); + } + + static final class GrpcInjector implements Injector { + final Injector delegate; + final Propagation.Setter setter; + + GrpcInjector(GrpcPropagation propagation, Setter setter) { + this.delegate = propagation.delegate.injector(setter); + this.setter = setter; + } + + @Override + public void inject(TraceContext traceContext, C carrier) { + if (carrier instanceof Metadata) { + ((Metadata) carrier).put(GRPC_TRACE_BIN, traceContext); + Tags tags = findTags(traceContext); + if (tags != null) ((Metadata) carrier).put(GRPC_TAGS_BIN, tags.toMap()); + } + delegate.inject(traceContext, carrier); + } + } + + @Nullable + static Tags findTags(TraceContext traceContext) { + List extra = traceContext.extra(); + for (int i = 0, length = extra.size(); i < length; i++) { + Object next = extra.get(i); + if (next instanceof GrpcPropagation.Tags) { + return (Tags) next; + } + } + return null; + } + + static final class GrpcExtractor implements Extractor { + final GrpcPropagation propagation; + final Extractor delegate; + final Propagation.Getter getter; + + GrpcExtractor(GrpcPropagation propagation, Getter getter) { + this.propagation = propagation; + this.delegate = propagation.delegate.extractor(getter); + this.getter = getter; + } + + @Override + public TraceContextOrSamplingFlags extract(C carrier) { + Tags tags = null; + if (carrier instanceof Metadata) { + TraceContext extractedTrace = ((Metadata) carrier).get(GRPC_TRACE_BIN); + Map extractedTags = ((Metadata) carrier).get(GRPC_TAGS_BIN); + if (extractedTags != null) { + tags = new Tags(extractedTags, extractedTags.remove(RPC_METHOD)); + } + if (extractedTrace != null) { + if (tags == null) return TraceContextOrSamplingFlags.create(extractedTrace); + return TraceContextOrSamplingFlags.newBuilder() + .addExtra(tags) + .context(extractedTrace) + .build(); + } + } + TraceContextOrSamplingFlags result = delegate.extract(carrier); + if (tags == null) return result; + return result.toBuilder().addExtra(tags).build(); + } + } + + static final class TagsFactory extends PropagationFieldsFactory { + @Override + protected Class type() { + return Tags.class; + } + + @Override + protected Tags create() { + return new Tags(); + } + + @Override + protected Tags create(Tags parent) { + return new Tags(parent); + } + } + + static final class Tags extends MapPropagationFields { + final String parentMethod; + + Tags() { + parentMethod = null; + } + + Tags(Tags parent) { + super(parent); + parentMethod = null; + } + + Tags(Map extracted, String parentMethod) { + super(extracted); + this.parentMethod = parentMethod; + } + } +} diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java index 84af55b9e2..b117efddab 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java @@ -2,63 +2,110 @@ import brave.ErrorParser; import brave.Tracing; -import com.google.auto.value.AutoValue; +import brave.propagation.Propagation; import io.grpc.ClientInterceptor; +import io.grpc.Metadata; import io.grpc.ServerInterceptor; -@AutoValue -public abstract class GrpcTracing { +public final class GrpcTracing { public static GrpcTracing create(Tracing tracing) { - return newBuilder(tracing).build(); + if (tracing == null) throw new NullPointerException("tracing == null"); + return new Builder(tracing).build(); } public static Builder newBuilder(Tracing tracing) { - ErrorParser errorParser = tracing.errorParser(); - return new AutoValue_GrpcTracing.Builder().tracing(tracing) - // override to re-use any custom error parser from the tracing component - .clientParser(new GrpcClientParser() { - @Override protected ErrorParser errorParser() { - return errorParser; - } - }) - .serverParser(new GrpcServerParser() { - @Override protected ErrorParser errorParser() { - return errorParser; - } - }); + return new Builder(tracing); } - abstract Tracing tracing(); + public static final class Builder { + final Tracing tracing; + GrpcClientParser clientParser; + GrpcServerParser serverParser; + boolean grpcPropagationFormatEnabled = false; - abstract GrpcClientParser clientParser(); + Builder(Tracing tracing) { + this.tracing = tracing; + // override to re-use any custom error parser from the tracing component + ErrorParser errorParser = tracing.errorParser(); + clientParser = new GrpcClientParser() { + @Override protected ErrorParser errorParser() { + return errorParser; + } + }; + serverParser = new GrpcServerParser() { + @Override protected ErrorParser errorParser() { + return errorParser; + } + }; + } - abstract GrpcServerParser serverParser(); + public Builder clientParser(GrpcClientParser clientParser) { + if (clientParser == null) throw new NullPointerException("clientParser == null"); + this.clientParser = clientParser; + return this; + } - public abstract Builder toBuilder(); + public Builder serverParser(GrpcServerParser serverParser) { + if (serverParser == null) throw new NullPointerException("serverParser == null"); + this.serverParser = serverParser; + return this; + } - /** This interceptor traces outbound calls */ - public final ClientInterceptor newClientInterceptor() { - return new TracingClientInterceptor(this); - } + /** + * When true, "grpc-trace-bin" is preferred when extracting trace context. This is useful when + * OpenCensus implements tracing upstream or downstream. + * Default is false. + * + *

This wraps an existing propagation implementation, but prefers extracting "grpc-trace-bin" + * and "grpc-tags-bin" when parsing gRPC metadata. The incoming service method is propagated to + * outgoing client requests and written in the tags context as the key named "method". + * Regardless of whether "grpc-trace-bin" was parsed, it is speculatively written on outgoing + * requests. + * + *

Warning: the format of both "grpc-trace-bin" and "grpc-tags-bin" are version 0. As such, + * consider this feature experimental. + */ + public Builder grpcPropagationFormatEnabled(boolean grpcPropagationFormatEnabled) { + this.grpcPropagationFormatEnabled = grpcPropagationFormatEnabled; + return this; + } - /** This interceptor traces inbound calls */ - public ServerInterceptor newServerInterceptor() { - return new TracingServerInterceptor(this); + public GrpcTracing build() { + return new GrpcTracing(this); + } } - @AutoValue.Builder public static abstract class Builder { - abstract Builder tracing(Tracing tracing); + final Tracing tracing; + final Propagation> propagation; + final GrpcClientParser clientParser; + final GrpcServerParser serverParser; + final boolean grpcPropagationFormatEnabled; - public abstract Builder clientParser(GrpcClientParser clientParser); - - public abstract Builder serverParser(GrpcServerParser serverParser); + GrpcTracing(Builder builder) { // intentionally hidden constructor + tracing = builder.tracing; + grpcPropagationFormatEnabled = builder.grpcPropagationFormatEnabled; + Propagation.Factory propagationFactory = tracing.propagationFactory(); + if (grpcPropagationFormatEnabled) { + propagationFactory = GrpcPropagation.newFactory(propagationFactory); + } + propagation = propagationFactory.create(AsciiMetadataKeyFactory.INSTANCE); + clientParser = builder.clientParser; + serverParser = builder.serverParser; + } - public abstract GrpcTracing build(); + public Builder toBuilder() { + return new Builder(tracing) + .clientParser(clientParser) + .serverParser(serverParser); + } - Builder() { - } + /** This interceptor traces outbound calls */ + public final ClientInterceptor newClientInterceptor() { + return new TracingClientInterceptor(this); } - GrpcTracing() { // intentionally hidden constructor + /** This interceptor traces inbound calls */ + public ServerInterceptor newServerInterceptor() { + return new TracingServerInterceptor(this); } } diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TagContextBinaryMarshaller.java b/instrumentation/grpc/src/main/java/brave/grpc/TagContextBinaryMarshaller.java new file mode 100644 index 0000000000..4b5611b369 --- /dev/null +++ b/instrumentation/grpc/src/main/java/brave/grpc/TagContextBinaryMarshaller.java @@ -0,0 +1,150 @@ +package brave.grpc; + +import io.grpc.Metadata.BinaryMarshaller; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.logging.Logger; + +import static java.util.logging.Level.FINE; + +/** + * This logs instead of throwing exceptions. + * + *

See + * https://github.com/census-instrumentation/opencensus-java/blob/master/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java + */ +final class TagContextBinaryMarshaller implements BinaryMarshaller> { + static final Logger logger = Logger.getLogger(TagContextBinaryMarshaller.class.getName()); + static final byte VERSION = 0, TAG_FIELD_ID = 0; + static final byte[] EMPTY_BYTES = {}; + + @Override + public byte[] toBytes(Map tagContext) { + if (tagContext == null) { + throw new NullPointerException("tagContext == null"); // programming error + } + if (tagContext.isEmpty()) return EMPTY_BYTES; + byte[] result = new byte[sizeInBytes(tagContext)]; + Buffer bytes = new Buffer(result); + bytes.writeByte(VERSION); + for (Map.Entry entry : tagContext.entrySet()) { + bytes.writeByte(TAG_FIELD_ID); + bytes.writeLengthPrefixed(entry.getKey()); + bytes.writeLengthPrefixed(entry.getValue()); + } + return result; + } + + @Override + public Map parseBytes(byte[] buf) { + if (buf == null) throw new NullPointerException("buf == null"); // programming error + if (buf.length == 0) return Collections.emptyMap(); + Buffer bytes = new Buffer(buf); + byte version = bytes.readByte(); + if (version != VERSION) { + logger.log(FINE, "Invalid input: unsupported version {0}", version); + return null; + } + + Map result = new LinkedHashMap<>(); + while (bytes.remaining() > 3) { // tag for field ID and two lengths + if (bytes.readByte() == TAG_FIELD_ID) { + String key = bytes.readLengthPrefixed(); + if (key == null) break; + String val = bytes.readLengthPrefixed(); + if (val == null) break; + result.put(key, val); + } else { + logger.log(FINE, "Invalid input: expected TAG_FIELD_ID at offset {0}", bytes.pos); + break; + } + } + return result; + } + + // like census, this currently assumes both key and value are ascii + static int sizeInBytes(Map tagContext) { + int sizeInBytes = 1; // VERSION + for (Map.Entry entry : tagContext.entrySet()) { + sizeInBytes++; // TAG_FIELD_ID + int keyLength = entry.getKey().length(); + int valLength = entry.getValue().length(); + if (keyLength > 16383 || valLength > 16383) return sizeInBytes; // stop here + sizeInBytes += sizeOfLengthPrefixedString(keyLength); + sizeInBytes += sizeOfLengthPrefixedString(valLength); + } + return sizeInBytes; + } + + static int sizeOfLengthPrefixedString(int length) { + return (length > 127 ? 2 : 1) + length; + } + + static final class Buffer { + final byte[] buf; + int pos; + + Buffer(byte[] buf) { + this.buf = buf; + } + + int remaining() { + return buf.length - pos; + } + + /** This needs to be checked externally to not overrun the underlying array */ + byte readByte() { + return buf[pos++]; + } + + void writeByte(int v) { + buf[pos++] = (byte) v; + } + + /** Works only when values are ascii */ + boolean writeLengthPrefixed(String value) { + int length = value.length(); + if (length > 16383) return false; // > 14bits is too big + + if (length > 127) { // varint encode over 2 bytes + buf[pos++] = (byte) ((length & 0x7f) | 0x80); + buf[pos++] = (byte) ((length >>> 7)); + } else { + buf[pos++] = (byte) length; + } + + for (int i = 0; i < length; i++) { + buf[pos++] = (byte) value.charAt(i); + } + + return true; + } + + String readLengthPrefixed() { + byte b1 = buf[pos++]; + if (b1 >= 0) { // negative means MSB set + return readAsciiString(b1); + } + return readAsciiString(readVarint(b1)); + } + + private int readVarint(byte b1) { + int b2 = buf[pos++]; + if ((b2 & 0xf0) != 0) { + logger.log(FINE, "Greater than 14-bit varint at position {0}", pos); + return -1; + } + return b1 & 0x7f | b2 << 28; + } + + String readAsciiString(int length) { + if (length == -1 || remaining() < length) return null; + char[] string = new char[length]; + for (int i = 0; i < length; i++) { + string[i] = (char) buf[pos++]; + } + return new String(string); + } + } +} diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TraceContextBinaryMarshaller.java b/instrumentation/grpc/src/main/java/brave/grpc/TraceContextBinaryMarshaller.java new file mode 100644 index 0000000000..d72374b6f3 --- /dev/null +++ b/instrumentation/grpc/src/main/java/brave/grpc/TraceContextBinaryMarshaller.java @@ -0,0 +1,115 @@ +package brave.grpc; + +import brave.propagation.TraceContext; +import io.grpc.Metadata.BinaryMarshaller; +import java.util.logging.Logger; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.logging.Level.FINE; + +/** + * This logs instead of throwing exceptions. + * + *

See + * https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding.md + */ +final class TraceContextBinaryMarshaller implements BinaryMarshaller { + static final Logger logger = Logger.getLogger(TraceContextBinaryMarshaller.class.getName()); + static final byte VERSION = 0, + TRACE_ID_FIELD_ID = 0, + SPAN_ID_FIELD_ID = 1, + TRACE_OPTION_FIELD_ID = 2; + + private static final int FORMAT_LENGTH = + 4 /* version + 3 fields */ + 16 /* trace ID */ + 8 /* span ID */ + 1 /* sampled bit */; + + @Override + public byte[] toBytes(TraceContext traceContext) { + checkNotNull(traceContext, "traceContext"); + byte[] bytes = new byte[FORMAT_LENGTH]; + bytes[0] = VERSION; + bytes[1] = TRACE_ID_FIELD_ID; + writeLong(bytes, 2, traceContext.traceIdHigh()); + writeLong(bytes, 10, traceContext.traceId()); + bytes[18] = SPAN_ID_FIELD_ID; + writeLong(bytes, 19, traceContext.spanId()); + bytes[27] = TRACE_OPTION_FIELD_ID; + if (traceContext.sampled() != null && traceContext.sampled()) { + bytes[28] = 1; + } + return bytes; + } + + @Override + public TraceContext parseBytes(byte[] bytes) { + if (bytes == null) throw new NullPointerException("bytes == null"); // programming error + if (bytes.length == 0) return null; + if (bytes[0] != VERSION) { + logger.log(FINE, "Invalid input: unsupported version {0}", bytes[0]); + return null; + } + if (bytes.length < FORMAT_LENGTH - 2 /* sampled field + bit is optional */) { + logger.fine("Invalid input: truncated"); + return null; + } + long traceIdHigh, traceId, spanId; + Boolean sampled = null; + int pos = 1; + if (bytes[pos] == TRACE_ID_FIELD_ID) { + pos++; + traceIdHigh = readLong(bytes, pos); + traceId = readLong(bytes, pos + 8); + pos += 16; + } else { + logger.log(FINE, "Invalid input: expected trace ID at offset {0}", pos); + return null; + } + if (bytes[pos] == SPAN_ID_FIELD_ID) { + pos++; + spanId = readLong(bytes, pos); + pos += 8; + } else { + logger.log(FINE, "Invalid input: expected span ID at offset {0}", pos); + return null; + } + // The trace options field is optional. However, when present, it should be valid. + if (bytes.length > pos && bytes[pos] == TRACE_OPTION_FIELD_ID) { + pos++; + if (bytes.length < pos + 1) { + logger.log(FINE, "Invalid input: truncated"); + return null; + } + sampled = bytes[pos] == 1; + } + return TraceContext.newBuilder() + .traceIdHigh(traceIdHigh) + .traceId(traceId) + .spanId(spanId) + .sampled(sampled) + .build(); + } + + /** Inspired by {@code okio.Buffer.writeLong} */ + static void writeLong(byte[] data, int pos, long v) { + data[pos + 0] = (byte) ((v >>> 56L) & 0xff); + data[pos + 1] = (byte) ((v >>> 48L) & 0xff); + data[pos + 2] = (byte) ((v >>> 40L) & 0xff); + data[pos + 3] = (byte) ((v >>> 32L) & 0xff); + data[pos + 4] = (byte) ((v >>> 24L) & 0xff); + data[pos + 5] = (byte) ((v >>> 16L) & 0xff); + data[pos + 6] = (byte) ((v >>> 8L) & 0xff); + data[pos + 7] = (byte) (v & 0xff); + } + + /** Inspired by {@code okio.Buffer.readLong} */ + static long readLong(byte[] data, int pos) { + return (data[pos] & 0xffL) << 56 + | (data[pos + 1] & 0xffL) << 48 + | (data[pos + 2] & 0xffL) << 40 + | (data[pos + 3] & 0xffL) << 32 + | (data[pos + 4] & 0xffL) << 24 + | (data[pos + 5] & 0xffL) << 16 + | (data[pos + 6] & 0xffL) << 8 + | (data[pos + 7] & 0xffL); + } +} diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java b/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java index d1a577e273..5c4cc58daa 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java @@ -3,7 +3,7 @@ import brave.Span; import brave.Tracer; import brave.propagation.Propagation.Setter; -import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Injector; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -29,14 +29,13 @@ final class TracingClientInterceptor implements ClientInterceptor { }; final Tracer tracer; - final TraceContext.Injector injector; + final Injector injector; final GrpcClientParser parser; TracingClientInterceptor(GrpcTracing grpcTracing) { - tracer = grpcTracing.tracing().tracer(); - injector = grpcTracing.tracing().propagationFactory() - .create(AsciiMetadataKeyFactory.INSTANCE).injector(SETTER); - parser = grpcTracing.clientParser(); + tracer = grpcTracing.tracing.tracer(); + injector = grpcTracing.propagation.injector(SETTER); + parser = grpcTracing.clientParser; } /** diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java b/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java index c1d71ee650..26c1c4113d 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java @@ -3,31 +3,42 @@ import brave.Span; import brave.Tracer; import brave.propagation.Propagation; -import brave.propagation.TraceContext; +import brave.propagation.TraceContext.Extractor; import brave.propagation.TraceContextOrSamplingFlags; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; import io.grpc.Metadata; +import io.grpc.Metadata.Key; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; +import static brave.grpc.GrpcPropagation.RPC_METHOD; + // not exposed directly as implementation notably changes between versions 1.2 and 1.3 final class TracingServerInterceptor implements ServerInterceptor { + static final Propagation.Getter> GETTER = + new Propagation.Getter>() { // retrolambda no like + @Override public String get(Metadata metadata, Key key) { + return metadata.get(key); + } + + @Override public String toString() { + return "Metadata::get"; + } + }; + final Tracer tracer; - final TraceContext.Extractor extractor; + final Extractor extractor; final GrpcServerParser parser; + final boolean grpcPropagationFormatEnabled; TracingServerInterceptor(GrpcTracing grpcTracing) { - tracer = grpcTracing.tracing().tracer(); - extractor = grpcTracing.tracing().propagationFactory().create(AsciiMetadataKeyFactory.INSTANCE) - .extractor(new Propagation.Getter>() { // retrolambda no like - @Override public String get(Metadata metadata, Metadata.Key key) { - return metadata.get(key); - } - }); - parser = grpcTracing.serverParser(); + tracer = grpcTracing.tracing.tracer(); + extractor = grpcTracing.propagation.extractor(GETTER); + parser = grpcTracing.serverParser; + grpcPropagationFormatEnabled = grpcTracing.grpcPropagationFormatEnabled; } @Override @@ -38,6 +49,12 @@ public ServerCall.Listener interceptCall(final ServerCall responseObserver) { + TraceContext currentTraceContext = tracing != null ? tracing.currentTraceContext().get() : null; if (req.getName().equals("bad")) { responseObserver.onError(new IllegalArgumentException()); return; } - String message = tracing != null && tracing.currentTraceContext().get() != null - ? tracing.currentTraceContext().get().traceIdString() - : ""; + String message = currentTraceContext != null ? currentTraceContext.traceIdString() : ""; HelloReply reply = HelloReply.newBuilder().setMessage(message).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); } @Override - public void sayHelloWithManyReplies(HelloRequest request, StreamObserver responseObserver) { + public void sayHelloWithManyReplies( + HelloRequest request, StreamObserver responseObserver) { for (int i = 0; i < 10; i++) { responseObserver.onNext(HelloReply.newBuilder().setMessage("reply " + i).build()); } diff --git a/instrumentation/grpc/src/test/java/brave/grpc/ITCensusInterop.java b/instrumentation/grpc/src/test/java/brave/grpc/ITCensusInterop.java new file mode 100644 index 0000000000..4b8349d005 --- /dev/null +++ b/instrumentation/grpc/src/test/java/brave/grpc/ITCensusInterop.java @@ -0,0 +1,211 @@ +package brave.grpc; + +import brave.Tracing; +import brave.internal.Nullable; +import brave.propagation.B3Propagation; +import brave.propagation.StrictCurrentTraceContext; +import brave.propagation.TraceContext; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.InternalNettyServerBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; +import io.opencensus.common.Scope; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.implcore.tags.TagContextImpl; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; +import io.opencensus.testing.export.TestHandler; +import io.opencensus.trace.config.TraceParams; +import io.opencensus.trace.export.SpanData; +import io.opencensus.trace.samplers.Samplers; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import zipkin2.Span; + +import static brave.grpc.GreeterImpl.HELLO_REQUEST; +import static io.grpc.ServerInterceptors.intercept; +import static io.opencensus.trace.AttributeValue.stringAttributeValue; +import static io.opencensus.trace.Tracing.getExportComponent; +import static io.opencensus.trace.Tracing.getTraceConfig; +import static org.assertj.core.api.Assertions.assertThat; + +public class ITCensusInterop { + + static class TagsGreeterImpl extends GreeterImpl { + TagsGreeterImpl(@Nullable GrpcTracing grpcTracing) { + super(grpcTracing); + } + + /** This verifies internal state by writing to both brave and census apis */ + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + Map censusTags = + ((TagContextImpl) Tags.getTagger().getCurrentTagContext()).getTags(); + + // Read in-process tags from census and write to both span apis + io.opencensus.trace.Span censusSpan = + io.opencensus.trace.Tracing.getTracer().getCurrentSpan(); + for (Map.Entry entry : censusTags.entrySet()) { + spanCustomizer.tag(entry.getKey().getName(), entry.getValue().asString()); + censusSpan.putAttribute( + entry.getKey().getName(), stringAttributeValue(entry.getValue().asString())); + } + + // Read in-process tags from brave's grpc hooks and write to both span apis + TraceContext currentTraceContext = + tracing != null ? tracing.currentTraceContext().get() : null; + Map braveTags = + currentTraceContext != null + ? GrpcPropagation.findTags(currentTraceContext).toMap() + : Collections.emptyMap(); + for (Map.Entry entry : braveTags.entrySet()) { + spanCustomizer.tag(entry.getKey(), entry.getValue()); + censusSpan.putAttribute(entry.getKey(), stringAttributeValue(entry.getValue())); + } + super.sayHello(req, responseObserver); + } + } + + final TestHandler testHandler = new TestHandler(); + + @Before + public void beforeClass() { + getTraceConfig() + .updateActiveTraceParams( + TraceParams.DEFAULT.toBuilder().setSampler(Samplers.alwaysSample()).build()); + getExportComponent().getSpanExporter().registerHandler("test", testHandler); + } + + /** See brave.http.ITHttp for rationale on using a concurrent blocking queue */ + BlockingQueue spans = new LinkedBlockingQueue<>(); + + Tracing tracing = + Tracing.newBuilder() + .propagationFactory(GrpcPropagation.newFactory(B3Propagation.FACTORY)) + .spanReporter(spans::add) + .currentTraceContext(new StrictCurrentTraceContext()) + .build(); + GrpcTracing grpcTracing = GrpcTracing.create(tracing); + + Server server; + ManagedChannel client; + + @Test + public void readsCensusPropagation() throws Exception { + initServer(true); // trace server with brave + initClient(false); // trace client with census + + GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); + + // this takes 5 seconds due to hard-coding in ExportComponentImpl + SpanData clientSpan = testHandler.waitForExport(1).get(0); + + Span serverSpan = spans.take(); + assertThat(clientSpan.getContext().getTraceId().toLowerBase16()) + .isEqualTo(serverSpan.traceId()); + assertThat(clientSpan.getContext().getSpanId().toLowerBase16()) + .isEqualTo(serverSpan.parentId()); + assertThat(serverSpan.tags()).containsEntry("method", "helloworld.Greeter/SayHello"); + } + + @Test + public void readsCensusPropagation_withIncomingMethod() throws Exception { + initServer(true); // trace server with brave + initClient(false); // trace client with census + + try (Scope tagger = + Tags.getTagger() + .emptyBuilder() + .put(RpcMeasureConstants.RPC_METHOD, TagValue.create("edge.Ingress/InitialRoute")) + .buildScoped()) { + GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); + } + + // this takes 5 seconds due to hard-coding in ExportComponentImpl + SpanData clientSpan = testHandler.waitForExport(1).get(0); + + Span serverSpan = spans.take(); + assertThat(clientSpan.getContext().getTraceId().toLowerBase16()) + .isEqualTo(serverSpan.traceId()); + assertThat(clientSpan.getContext().getSpanId().toLowerBase16()) + .isEqualTo(serverSpan.parentId()); + assertThat(serverSpan.tags()).containsEntry("method", "helloworld.Greeter/SayHello"); + } + + @Test + public void writesCensusPropagation() throws Exception { + initServer(false); // trace server with census + initClient(true); // trace client with brave + + GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); + + // this takes 5 seconds due to hard-coding in ExportComponentImpl + SpanData serverSpan = testHandler.waitForExport(1).get(0); + + Span clientSpan = spans.take(); + assertThat(clientSpan.traceId()) + .isEqualTo(serverSpan.getContext().getTraceId().toLowerBase16()); + assertThat(clientSpan.id()).isEqualTo(serverSpan.getParentSpanId().toLowerBase16()); + assertThat(serverSpan.getAttributes().getAttributeMap()) + .containsEntry("method", stringAttributeValue("helloworld.Greeter/SayHello")); + } + + void initServer(boolean traceWithBrave) throws Exception { + if (traceWithBrave) { + NettyServerBuilder builder = (NettyServerBuilder) ServerBuilder.forPort(PickUnusedPort.get()); + builder.addService( + intercept(new TagsGreeterImpl(grpcTracing), grpcTracing.newServerInterceptor())); + // TODO: track gRPC exposing this + InternalNettyServerBuilder.setTracingEnabled(builder, false); + server = builder.build(); + } else { + server = + ServerBuilder.forPort(PickUnusedPort.get()).addService(new TagsGreeterImpl(null)).build(); + } + server.start(); + } + + void initClient(boolean traceWithBrave) { + if (traceWithBrave) { + NettyChannelBuilder builder = + (NettyChannelBuilder) + ManagedChannelBuilder.forAddress("localhost", server.getPort()) + .intercept(grpcTracing.newClientInterceptor()) + .usePlaintext(); + // TODO: track gRPC exposing this + InternalNettyChannelBuilder.setTracingEnabled(builder, false); + client = builder.build(); + } else { + client = + ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); + } + } + + @After + public void close() throws Exception { + if (client != null) { + client.shutdown(); + client.awaitTermination(1, TimeUnit.SECONDS); + } + if (server != null) { + server.shutdown(); + server.awaitTermination(1, TimeUnit.SECONDS); + } + tracing.close(); + } +} diff --git a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java index be3c0314c2..e4b060242c 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java @@ -58,7 +58,7 @@ public class ITTracingClientInterceptor { BlockingQueue spans = new LinkedBlockingQueue<>(); GrpcTracing tracing = GrpcTracing.create(tracingBuilder(Sampler.ALWAYS_SAMPLE).build()); - Tracer tracer = tracing.tracing().tracer(); + Tracer tracer = tracing.tracing.tracer(); TestServer server = new TestServer(); ManagedChannel client; @@ -289,14 +289,14 @@ public void start(Listener responseListener, Metadata headers) { tracing = tracing.toBuilder().clientParser(new GrpcClientParser() { @Override protected void onMessageSent(M message, SpanCustomizer span) { span.tag("grpc.message_sent", message.toString()); - if (tracing.tracing().currentTraceContext().get() != null) { + if (tracing.tracing.currentTraceContext().get() != null) { span.tag("grpc.message_sent.visible", "true"); } } @Override protected void onMessageReceived(M message, SpanCustomizer span) { span.tag("grpc.message_received", message.toString()); - if (tracing.tracing().currentTraceContext().get() != null) { + if (tracing.tracing.currentTraceContext().get() != null) { span.tag("grpc.message_received.visible", "true"); } } diff --git a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java index e7ba3622de..147676b729 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java @@ -204,7 +204,7 @@ public void start(Listener responseListener, Metadata headers) { public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { testLogger.info("in span!"); - fromUserInterceptor.set(grpcTracing.tracing().currentTraceContext().get()); + fromUserInterceptor.set(grpcTracing.tracing.currentTraceContext().get()); return next.startCall(call, headers); } }); @@ -259,14 +259,14 @@ public void serverParserTest() throws Exception { grpcTracing = grpcTracing.toBuilder().serverParser(new GrpcServerParser() { @Override protected void onMessageSent(M message, SpanCustomizer span) { span.tag("grpc.message_sent", message.toString()); - if (grpcTracing.tracing().currentTraceContext().get() != null) { + if (grpcTracing.tracing.currentTraceContext().get() != null) { span.tag("grpc.message_sent.visible", "true"); } } @Override protected void onMessageReceived(M message, SpanCustomizer span) { span.tag("grpc.message_received", message.toString()); - if (grpcTracing.tracing().currentTraceContext().get() != null) { + if (grpcTracing.tracing.currentTraceContext().get() != null) { span.tag("grpc.message_received.visible", "true"); } } diff --git a/instrumentation/grpc/src/test/java/brave/grpc/TagContextBinaryMarshallerTest.java b/instrumentation/grpc/src/test/java/brave/grpc/TagContextBinaryMarshallerTest.java new file mode 100644 index 0000000000..5928a608cb --- /dev/null +++ b/instrumentation/grpc/src/test/java/brave/grpc/TagContextBinaryMarshallerTest.java @@ -0,0 +1,95 @@ +package brave.grpc; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TagContextBinaryMarshallerTest { + TagContextBinaryMarshaller binaryMarshaller = new TagContextBinaryMarshaller(); + + @Test + public void roundtrip() { + Map context = ImmutableMap.of("method", "foo"); + byte[] contextBytes = { + 0, // version + 0, // field number + 6, 'm', 'e', 't', 'h', 'o', 'd', // + 3, 'f', 'o', 'o' // + }; + byte[] serialized = binaryMarshaller.toBytes(context); + assertThat(serialized).containsExactly(contextBytes); + + assertThat(binaryMarshaller.parseBytes(serialized)).isEqualTo(context); + } + + @Test + public void roundtrip_multipleKeys() { + Map context = ImmutableMap.of("method", "foo", "user", "romeo"); + byte[] contextBytes = { + 0, // version + 0, // field number + 6, 'm', 'e', 't', 'h', 'o', 'd', // + 3, 'f', 'o', 'o', // + 0, // field number + 4, 'u', 's', 'e', 'r', // + 5, 'r', 'o', 'm', 'e', 'o' // + }; + byte[] serialized = binaryMarshaller.toBytes(context); + assertThat(serialized).containsExactly(contextBytes); + + assertThat(binaryMarshaller.parseBytes(serialized)).isEqualTo(context); + } + + @Test + public void parseBytes_empty() { + assertThat(binaryMarshaller.parseBytes(new byte[0])).isEmpty(); + } + + @Test + public void parseBytes_unsupportedVersionId_toEmpty() { + byte[] contextBytes = { + 1, // unsupported version + 0, // field number + 6, 'm', 'e', 't', 'h', 'o', 'd', // + 3, 'f', 'o', 'o' // + }; + assertThat(binaryMarshaller.parseBytes(contextBytes)).isNull(); + } + + @Test + public void parseBytes_unsupportedFieldIdFirst_empty() { + byte[] contextBytes = { + 0, // version + 1, // unsupported field number + 0, // field number + 6, 'm', 'e', 't', 'h', 'o', 'd', // + 3, 'f', 'o', 'o' // + }; + assertThat(binaryMarshaller.parseBytes(contextBytes)).isEmpty(); + } + + @Test + public void parseBytes_unsupportedFieldIdSecond_ignored() { + byte[] contextBytes = { + 0, // version + 0, // field number + 6, 'm', 'e', 't', 'h', 'o', 'd', // + 3, 'f', 'o', 'o', // + 1, // unsupported field number + }; + assertThat(binaryMarshaller.parseBytes(contextBytes)) + .isEqualTo(ImmutableMap.of("method", "foo")); + } + + @Test + public void parseBytes_truncatedDoesntCrash() { + byte[] contextBytes = { + 0, // version + 0, // field number + 6, 'm', 'e', 't', // truncated + }; + assertThat(binaryMarshaller.parseBytes(contextBytes)).isEmpty(); + } +} diff --git a/instrumentation/grpc/src/test/java/brave/grpc/TraceContextBinaryMarshallerTest.java b/instrumentation/grpc/src/test/java/brave/grpc/TraceContextBinaryMarshallerTest.java new file mode 100644 index 0000000000..87031612cc --- /dev/null +++ b/instrumentation/grpc/src/test/java/brave/grpc/TraceContextBinaryMarshallerTest.java @@ -0,0 +1,123 @@ +package brave.grpc; + +import brave.propagation.TraceContext; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests here are based on {@code io.opencensus.implcore.trace.propagation.BinaryFormatImplTest} + */ +public class TraceContextBinaryMarshallerTest { + TraceContext context = TraceContext.newBuilder() + .traceIdHigh(Long.MAX_VALUE).traceId(Long.MIN_VALUE) + .spanId(-1) + .sampled(true) + .build(); + byte[] contextBytes = { + 0, // version + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, // trace ID + 1, -1, -1, -1, -1, -1, -1, -1, -1, // span ID + 2, 1 // sampled + }; + + TraceContextBinaryMarshaller binaryMarshaller = new TraceContextBinaryMarshaller(); + + @Test public void roundtrip() { + byte[] serialized = binaryMarshaller.toBytes(context); + assertThat(serialized) + .containsExactly(contextBytes); + + assertThat(binaryMarshaller.parseBytes(serialized)) + .isEqualTo(context); + } + + @Test public void roundtrip_unsampled() { + context = context.toBuilder().sampled(false).build(); + + byte[] serialized = binaryMarshaller.toBytes(context); + contextBytes[contextBytes.length - 1] = 0; // unsampled + assertThat(serialized) + .containsExactly(contextBytes); + + assertThat(binaryMarshaller.parseBytes(serialized)) + .isEqualTo(context); + } + + @Test public void parseBytes_empty_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[0])) + .isNull(); + } + + @Test public void parseBytes_unsupportedVersionId_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 1, // bad version + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, + 1, -1, -1, -1, -1, -1, -1, -1, -1, + 2, 1 + })).isNull(); + } + + @Test public void parseBytes_unsupportedFieldIdFirst_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 4, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, // bad field number + 1, -1, -1, -1, -1, -1, -1, -1, -1, + 2, 1 + })).isNull(); + } + + @Test public void parseBytes_unsupportedFieldIdSecond_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, + 4, -1, -1, -1, -1, -1, -1, -1, -1, // bad field number + 2, 1 + })).isNull(); + } + + @Test public void parseBytes_unsupportedFieldIdThird_toSampledNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, + 1, -1, -1, -1, -1, -1, -1, -1, -1, + 4, 1 // bad field number + }).sampled()).isNull(); + } + + @Test public void parseBytes_64BitTraceId_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 0, 127, -1, -1, -1, -1, -1, -1, -1, // half a trace ID + 1, -1, -1, -1, -1, -1, -1, -1, -1, + 2, 1 + })).isNull(); + } + + @Test public void parseBytes_32BitSpanId_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, + 1, -1, -1, -1, -1, // half a span ID + 2, 1 + })).isNull(); + } + + @Test public void parseBytes_truncatedTraceOptions_toNull() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, + 1, -1, -1, -1, -1, -1, -1, -1, -1, + 2 // has field ID, but missing sampled bit + })).isNull(); + } + + @Test public void parseBytes_missingTraceOptions() { + assertThat(binaryMarshaller.parseBytes(new byte[] { + 0, + 0, 127, -1, -1, -1, -1, -1, -1, -1, -128, 0, 0, 0, 0, 0, 0, 0, + 1, -1, -1, -1, -1, -1, -1, -1, -1, + // no trace options field + })).isEqualTo(context); + } +} diff --git a/pom.xml b/pom.xml index 7f2ce68406..f9b46de7fb 100755 --- a/pom.xml +++ b/pom.xml @@ -59,8 +59,8 @@ 1.8 - 2.9.0 - 2.7.0 + 2.9.3 + 2.7.3 4.3.13.RELEASE @@ -73,7 +73,7 @@ 18.1.0 2.10.0 3.9.1 - 1.8.0 + 1.12.0 4.1.19.Final 2.7.1 4.12