From ee9917c0bf5a6617933676c7a786308921a51a01 Mon Sep 17 00:00:00 2001 From: Jaehong-Kim Date: Thu, 14 Sep 2023 11:33:44 +0900 Subject: [PATCH] [#10335] Backport: update reactor publishOn, subscribeOn options --- .../resources/profiles/local/pinpoint.config | 3 + .../profiles/release/pinpoint.config | 3 + .../plugin/reactor/ReactorPlugin.java | 28 ++++---- .../plugin/reactor/ReactorPluginConfig.java | 14 ++++ .../FluxAndMonoPublishOnInterceptor.java | 7 +- .../FluxAndMonoSubscribeOnInterceptor.java | 68 +++++++++++++++++++ 6 files changed, 110 insertions(+), 13 deletions(-) create mode 100644 plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoSubscribeOnInterceptor.java diff --git a/agent/src/main/resources/profiles/local/pinpoint.config b/agent/src/main/resources/profiles/local/pinpoint.config index a4c3f545f203..e89ecc98613b 100644 --- a/agent/src/main/resources/profiles/local/pinpoint.config +++ b/agent/src/main/resources/profiles/local/pinpoint.config @@ -1086,6 +1086,9 @@ profiler.spring.tx.enable=true profiler.reactor.enable=true # Set whether to trace the onErrorComplete(), onErrorResume(), onErrorMap(), onErrorReturn() methods profiler.reactor.trace.onError=false +# Set whether to trace the publishOn(), subscribeOn() methods +profiler.reactor.trace.publishOn=true +profiler.reactor.trace.subscribeOn=true ########################################################### # log4j (guide url : https://github.com/naver/pinpoint/blob/master/doc/per-request_feature_guide.md) diff --git a/agent/src/main/resources/profiles/release/pinpoint.config b/agent/src/main/resources/profiles/release/pinpoint.config index 62732e2690ea..c195e3d28278 100644 --- a/agent/src/main/resources/profiles/release/pinpoint.config +++ b/agent/src/main/resources/profiles/release/pinpoint.config @@ -1085,6 +1085,9 @@ profiler.spring.tx.enable=true profiler.reactor.enable=true # Set whether to trace the onErrorComplete(), onErrorResume(), onErrorMap(), onErrorReturn() methods profiler.reactor.trace.onError=false +# Set whether to trace the publishOn(), subscribeOn() methods +profiler.reactor.trace.publishOn=true +profiler.reactor.trace.subscribeOn=true ########################################################### # log4j (guide url : https://github.com/naver/pinpoint/blob/master/doc/per-request_feature_guide.md) diff --git a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java index 3991e17231a2..56f9a9325840 100644 --- a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java +++ b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java @@ -38,6 +38,7 @@ import com.navercorp.pinpoint.plugin.reactor.interceptor.ConnectableFluxConstructorInterceptor; import com.navercorp.pinpoint.plugin.reactor.interceptor.ConnectableFluxSubscribeInterceptor; import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxAndMonoPublishOnInterceptor; +import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxAndMonoSubscribeOnInterceptor; import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxConstructorInterceptor; import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxDelaySubscriptionConstructorInterceptor; import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxDelaySubscriptionSubscribeInterceptor; @@ -100,6 +101,7 @@ public void setup(ProfilerPluginSetupContext context) { } logger.info("{} version range=[3.1.0.RELEASE, 3.3.0.RELEASE], config:{}", this.getClass().getSimpleName(), config); + addThreadingAndSchedulers(); addFlux(); addMono(); addParallelFlux(); @@ -115,7 +117,7 @@ public void setTransformTemplate(MatchableTransformTemplate transformTemplate) { this.transformTemplate = transformTemplate; } - private void addFlux() { + private void addThreadingAndSchedulers() { transformTemplate.transform("reactor.core.publisher.Flux", FluxMethodTransform.class); // publishOn addFluxOperatorTransform("reactor.core.publisher.FluxPublishOn"); @@ -130,6 +132,18 @@ private void addFlux() { addFluxOperatorTransform("reactor.core.publisher.FluxSubscribeOn"); addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber"); + transformTemplate.transform("reactor.core.publisher.Mono", MonoMethodTransform.class); + // publishOn + addMonoOperatorTransform("reactor.core.publisher.MonoPublishOn"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoPublishOn$PublishOnSubscriber"); + // subscribeOn + addMonoTransform("reactor.core.publisher.MonoSubscribeOnValue"); + addMonoTransform("reactor.core.publisher.MonoSubscribeOnCallable"); + addMonoOperatorTransform("reactor.core.publisher.MonoSubscribeOn"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber"); + } + + private void addFlux() { // Flux addConnectableFluxTransform("reactor.core.publisher.ConnectableFluxHide"); addConnectableFluxTransform("reactor.core.publisher.ConnectableFluxOnAssembly"); @@ -292,16 +306,6 @@ private void addFlux() { } private void addMono() { - transformTemplate.transform("reactor.core.publisher.Mono", MonoMethodTransform.class); - // publishOn - addMonoOperatorTransform("reactor.core.publisher.MonoPublishOn"); - addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoPublishOn$PublishOnSubscriber"); - // subscribeOn - addMonoTransform("reactor.core.publisher.MonoSubscribeOnValue"); - addMonoTransform("reactor.core.publisher.MonoSubscribeOnCallable"); - addMonoOperatorTransform("reactor.core.publisher.MonoSubscribeOn"); - addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber"); - // mono addMonoOperatorTransform("reactor.core.publisher.MonoAll"); addMonoOperatorTransform("reactor.core.publisher.MonoAny"); @@ -480,7 +484,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } final InstrumentMethod subscribeOnMethod = target.getDeclaredMethod("subscribeOn", "reactor.core.scheduler.Scheduler", "boolean"); if (subscribeOnMethod != null) { - subscribeOnMethod.addInterceptor(FluxAndMonoPublishOnInterceptor.class); + subscribeOnMethod.addInterceptor(FluxAndMonoSubscribeOnInterceptor.class); } return target.toBytecode(); diff --git a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPluginConfig.java b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPluginConfig.java index ee2ef8a5b489..d73719145b84 100644 --- a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPluginConfig.java +++ b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPluginConfig.java @@ -27,6 +27,8 @@ public class ReactorPluginConfig { private final boolean enable; private final boolean traceOnError; + private final boolean tracePublishOn; + private final boolean traceSubscribeOn; public ReactorPluginConfig(ProfilerConfig config) { Objects.requireNonNull(config, "config"); @@ -34,6 +36,8 @@ public ReactorPluginConfig(ProfilerConfig config) { // plugin this.enable = config.readBoolean("profiler.reactor.enable", true); this.traceOnError = config.readBoolean("profiler.reactor.trace.onError", false); + this.tracePublishOn = config.readBoolean("profiler.reactor.trace.publishOn", true); + this.traceSubscribeOn = config.readBoolean("profiler.reactor.trace.subscribeOn", true); } public boolean isEnable() { @@ -44,11 +48,21 @@ public boolean isTraceOnError() { return traceOnError; } + public boolean isTracePublishOn() { + return tracePublishOn; + } + + public boolean isTraceSubscribeOn() { + return traceSubscribeOn; + } + @Override public String toString() { return "ReactorPluginConfig{" + "enable=" + enable + ", traceOnError=" + traceOnError + + ", tracePublishOn=" + tracePublishOn + + ", traceSubscribeOn=" + traceSubscribeOn + '}'; } } \ No newline at end of file diff --git a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoPublishOnInterceptor.java b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoPublishOnInterceptor.java index d45036a27785..cccb73ca5f6e 100644 --- a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoPublishOnInterceptor.java +++ b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoPublishOnInterceptor.java @@ -23,12 +23,17 @@ import com.navercorp.pinpoint.bootstrap.context.TraceContext; import com.navercorp.pinpoint.bootstrap.interceptor.SpanEventSimpleAroundInterceptorForPlugin; import com.navercorp.pinpoint.plugin.reactor.ReactorConstants; +import com.navercorp.pinpoint.plugin.reactor.ReactorPluginConfig; public class FluxAndMonoPublishOnInterceptor extends SpanEventSimpleAroundInterceptorForPlugin { + private final boolean tracePublishOn; + // public final Mono publishOn(Scheduler scheduler) public FluxAndMonoPublishOnInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) { super(traceContext, methodDescriptor); + final ReactorPluginConfig config = new ReactorPluginConfig(traceContext.getProfilerConfig()); + this.tracePublishOn = config.isTracePublishOn(); } @Override @@ -41,7 +46,7 @@ public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] a recorder.recordServiceType(ReactorConstants.REACTOR_NETTY); recorder.recordException(throwable); - if (isAsync(result, throwable)) { + if (tracePublishOn && isAsync(result, throwable)) { // make asynchronous trace-id final AsyncContext asyncContext = recorder.recordNextAsyncContext(); ((AsyncContextAccessor) result)._$PINPOINT$_setAsyncContext(asyncContext); diff --git a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoSubscribeOnInterceptor.java b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoSubscribeOnInterceptor.java new file mode 100644 index 000000000000..7278577d3209 --- /dev/null +++ b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/interceptor/FluxAndMonoSubscribeOnInterceptor.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.plugin.reactor.interceptor; + +import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor; +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; +import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder; +import com.navercorp.pinpoint.bootstrap.context.TraceContext; +import com.navercorp.pinpoint.bootstrap.interceptor.SpanEventSimpleAroundInterceptorForPlugin; +import com.navercorp.pinpoint.plugin.reactor.ReactorConstants; +import com.navercorp.pinpoint.plugin.reactor.ReactorPluginConfig; + +public class FluxAndMonoSubscribeOnInterceptor extends SpanEventSimpleAroundInterceptorForPlugin { + + private final boolean traceSubscribeOn; + + // public final Mono subscribeOn(Scheduler scheduler) + public FluxAndMonoSubscribeOnInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) { + super(traceContext, methodDescriptor); + final ReactorPluginConfig config = new ReactorPluginConfig(traceContext.getProfilerConfig()); + this.traceSubscribeOn = config.isTraceSubscribeOn(); + } + + @Override + public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) throws Exception { + } + + @Override + public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) throws Exception { + recorder.recordApi(methodDescriptor); + recorder.recordServiceType(ReactorConstants.REACTOR_NETTY); + recorder.recordException(throwable); + + if (traceSubscribeOn && isAsync(result, throwable)) { + // make asynchronous trace-id + final AsyncContext asyncContext = recorder.recordNextAsyncContext(); + ((AsyncContextAccessor) result)._$PINPOINT$_setAsyncContext(asyncContext); + if (isDebug) { + logger.debug("Set asyncContext to result. asyncContext={}", asyncContext); + } + } + } + + private boolean isAsync(Object result, Throwable throwable) { + if (throwable != null) { + return false; + } + if (Boolean.FALSE == (result instanceof AsyncContextAccessor)) { + return false; + } + return true; + } +}