Skip to content

Commit

Permalink
[#10335] Backport: update reactor publishOn, subscribeOn options
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Sep 14, 2023
1 parent 93df57c commit 091523d
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 13 deletions.
3 changes: 3 additions & 0 deletions agent/src/main/resources/profiles/local/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,9 @@ profiler.spring.tx.enable=true
profiler.reactor.enable=true
# Set whether to trace the onErrorComplete(), onErrorResume(), onErrorMap(), onErrorReturn() methods
profiler.reactor.trace.onError=false
# Set whether to trace the publishOn(), subscribeOn() methods
profiler.reactor.trace.publishOn=true
profiler.reactor.trace.subscribeOn=true

###########################################################
# log4j (guide url : https://github.com/naver/pinpoint/blob/master/doc/per-request_feature_guide.md)
Expand Down
3 changes: 3 additions & 0 deletions agent/src/main/resources/profiles/release/pinpoint.config
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,9 @@ profiler.spring.tx.enable=true
profiler.reactor.enable=true
# Set whether to trace the onErrorComplete(), onErrorResume(), onErrorMap(), onErrorReturn() methods
profiler.reactor.trace.onError=false
# Set whether to trace the publishOn(), subscribeOn() methods
profiler.reactor.trace.publishOn=true
profiler.reactor.trace.subscribeOn=true

###########################################################
# log4j (guide url : https://github.com/naver/pinpoint/blob/master/doc/per-request_feature_guide.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.navercorp.pinpoint.plugin.reactor.interceptor.ConnectableFluxConstructorInterceptor;
import com.navercorp.pinpoint.plugin.reactor.interceptor.ConnectableFluxSubscribeInterceptor;
import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxAndMonoPublishOnInterceptor;
import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxAndMonoSubscribeOnInterceptor;
import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxConstructorInterceptor;
import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxDelaySubscriptionConstructorInterceptor;
import com.navercorp.pinpoint.plugin.reactor.interceptor.FluxDelaySubscriptionSubscribeInterceptor;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void setup(ProfilerPluginSetupContext context) {
}
logger.info("{} version range=[3.1.0.RELEASE, 3.3.0.RELEASE], config:{}", this.getClass().getSimpleName(), config);

addThreadingAndSchedulers();
addFlux();
addMono();
addParallelFlux();
Expand All @@ -115,7 +117,7 @@ public void setTransformTemplate(MatchableTransformTemplate transformTemplate) {
this.transformTemplate = transformTemplate;
}

private void addFlux() {
private void addThreadingAndSchedulers() {
transformTemplate.transform("reactor.core.publisher.Flux", FluxMethodTransform.class);
// publishOn
addFluxOperatorTransform("reactor.core.publisher.FluxPublishOn");
Expand All @@ -130,6 +132,18 @@ private void addFlux() {
addFluxOperatorTransform("reactor.core.publisher.FluxSubscribeOn");
addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber");

transformTemplate.transform("reactor.core.publisher.Mono", MonoMethodTransform.class);
// publishOn
addMonoOperatorTransform("reactor.core.publisher.MonoPublishOn");
addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoPublishOn$PublishOnSubscriber");
// subscribeOn
addMonoTransform("reactor.core.publisher.MonoSubscribeOnValue");
addMonoTransform("reactor.core.publisher.MonoSubscribeOnCallable");
addMonoOperatorTransform("reactor.core.publisher.MonoSubscribeOn");
addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber");
}

private void addFlux() {
// Flux
addConnectableFluxTransform("reactor.core.publisher.ConnectableFluxHide");
addConnectableFluxTransform("reactor.core.publisher.ConnectableFluxOnAssembly");
Expand Down Expand Up @@ -292,16 +306,6 @@ private void addFlux() {
}

private void addMono() {
transformTemplate.transform("reactor.core.publisher.Mono", MonoMethodTransform.class);
// publishOn
addMonoOperatorTransform("reactor.core.publisher.MonoPublishOn");
addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoPublishOn$PublishOnSubscriber");
// subscribeOn
addMonoTransform("reactor.core.publisher.MonoSubscribeOnValue");
addMonoTransform("reactor.core.publisher.MonoSubscribeOnCallable");
addMonoOperatorTransform("reactor.core.publisher.MonoSubscribeOn");
addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber");

// mono
addMonoOperatorTransform("reactor.core.publisher.MonoAll");
addMonoOperatorTransform("reactor.core.publisher.MonoAny");
Expand Down Expand Up @@ -480,7 +484,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin
}
final InstrumentMethod subscribeOnMethod = target.getDeclaredMethod("subscribeOn", "reactor.core.scheduler.Scheduler", "boolean");
if (subscribeOnMethod != null) {
subscribeOnMethod.addInterceptor(FluxAndMonoPublishOnInterceptor.class);
subscribeOnMethod.addInterceptor(FluxAndMonoSubscribeOnInterceptor.class);
}

return target.toBytecode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ public class ReactorPluginConfig {
private final boolean enable;

private final boolean traceOnError;
private final boolean tracePublishOn;
private final boolean traceSubscribeOn;

public ReactorPluginConfig(ProfilerConfig config) {
Objects.requireNonNull(config, "config");

// plugin
this.enable = config.readBoolean("profiler.reactor.enable", true);
this.traceOnError = config.readBoolean("profiler.reactor.trace.onError", false);
this.tracePublishOn = config.readBoolean("profiler.reactor.trace.publishOn", true);
this.traceSubscribeOn = config.readBoolean("profiler.reactor.trace.subscribeOn", true);
}

public boolean isEnable() {
Expand All @@ -44,11 +48,21 @@ public boolean isTraceOnError() {
return traceOnError;
}

public boolean isTracePublishOn() {
return tracePublishOn;
}

public boolean isTraceSubscribeOn() {
return traceSubscribeOn;
}

@Override
public String toString() {
return "ReactorPluginConfig{" +
"enable=" + enable +
", traceOnError=" + traceOnError +
", tracePublishOn=" + tracePublishOn +
", traceSubscribeOn=" + traceSubscribeOn +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.SpanEventSimpleAroundInterceptorForPlugin;
import com.navercorp.pinpoint.plugin.reactor.ReactorConstants;
import com.navercorp.pinpoint.plugin.reactor.ReactorPluginConfig;

public class FluxAndMonoPublishOnInterceptor extends SpanEventSimpleAroundInterceptorForPlugin {

private final boolean tracePublishOn;

// public final Mono<T> publishOn(Scheduler scheduler)
public FluxAndMonoPublishOnInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);
final ReactorPluginConfig config = new ReactorPluginConfig(traceContext.getProfilerConfig());
this.tracePublishOn = config.isTracePublishOn();
}

@Override
Expand All @@ -41,7 +46,7 @@ public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] a
recorder.recordServiceType(ReactorConstants.REACTOR_NETTY);
recorder.recordException(throwable);

if (isAsync(result, throwable)) {
if (tracePublishOn && isAsync(result, throwable)) {
// make asynchronous trace-id
final AsyncContext asyncContext = recorder.recordNextAsyncContext();
((AsyncContextAccessor) result)._$PINPOINT$_setAsyncContext(asyncContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.reactor.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.SpanEventSimpleAroundInterceptorForPlugin;
import com.navercorp.pinpoint.plugin.reactor.ReactorConstants;
import com.navercorp.pinpoint.plugin.reactor.ReactorPluginConfig;

public class FluxAndMonoSubscribeOnInterceptor extends SpanEventSimpleAroundInterceptorForPlugin {

private final boolean traceSubscribeOn;

// public final Mono<T> subscribeOn(Scheduler scheduler)
public FluxAndMonoSubscribeOnInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);
final ReactorPluginConfig config = new ReactorPluginConfig(traceContext.getProfilerConfig());
this.traceSubscribeOn = config.isTraceSubscribeOn();
}

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) throws Exception {
}

@Override
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) throws Exception {
recorder.recordApi(methodDescriptor);
recorder.recordServiceType(ReactorConstants.REACTOR_NETTY);
recorder.recordException(throwable);

if (traceSubscribeOn && isAsync(result, throwable)) {
// make asynchronous trace-id
final AsyncContext asyncContext = recorder.recordNextAsyncContext();
((AsyncContextAccessor) result)._$PINPOINT$_setAsyncContext(asyncContext);
if (isDebug) {
logger.debug("Set asyncContext to result. asyncContext={}", asyncContext);
}
}
}

private boolean isAsync(Object result, Throwable throwable) {
if (throwable != null) {
return false;
}
if (Boolean.FALSE == (result instanceof AsyncContextAccessor)) {
return false;
}
return true;
}
}

0 comments on commit 091523d

Please # to comment.