Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Found some issue , when i using kafka consume, with kafka-coroutine context #10407

Open
gipyeong-lee opened this issue Oct 13, 2023 · 2 comments · May be fixed by #10405
Open

Found some issue , when i using kafka consume, with kafka-coroutine context #10407

gipyeong-lee opened this issue Oct 13, 2023 · 2 comments · May be fixed by #10405

Comments

@gipyeong-lee
Copy link

Prerequisites

What version of pinpoint are you using?

2.6.0-SNAPSHOT

Describe your problem**

Kafka Consuming - | trace failed | -> Kotlin Coroutine -> Webclient Request

What have you done?

  1. Service A : Begin request to Service A
  2. Service B : Producing Kafka
  3. Service C : Consume from kafka
  4. Service C : using kotlin-coroutine context, request webclient api to Service C
  5. Service D : (something to do)

Actually Result:

  1. User -> Service A -> Kafka -> Service B
  2. User -> Service C

Expected Result:

  1. User -> Service A -> Kafka -> Service B -> Service C

Screenshots

Actually Result
스크린샷 2023-10-13 오후 2 54 05

Expected Result
스크린샷 2023-10-13 오후 2 55 59

@emeroad
Copy link
Member

emeroad commented Oct 16, 2023

This PR is not thread safe.
TraceContext must be thread safe, so it must not have state.

https://github.com/pinpoint-apm/pinpoint/tree/master/agent-sdk
When delegating request processing using thread-pool, you must use Async SDK below.

@gipyeong-lee
Copy link
Author

gipyeong-lee commented Oct 16, 2023

@emeroad thank you. but, is that agent doesn't support about coroutine scope . when i applied that sdk. it doesn't work what i want.
In DispatchInterceptor, traceContext.currentTraceObject(); is always null.

@Override
    public void before(Object target, Object[] args) {
        if (isDebug) {
            logger.beforeInterceptor(target, args);
        }

        final Continuation continuation = getContinuation(args);
        if (continuation == null) {
            return;
        }

        if (isCompletedContinuation(continuation)) {
            return;
        }

        if (!checkSupportCoroutinesName(continuation)) {
            return;
        }

        final Trace trace = traceContext.currentTraceObject();
        if (trace == null) {
            return;
        }

        final SpanEventRecorder recorder = trace.traceBlockBegin();

        final AsyncContextAccessor asyncContextAccessor = getAsyncContextAccessor(args);
        if (asyncContextAccessor != null) {
            // make asynchronous trace-id
            final AsyncContext asyncContext = recorder.recordNextAsyncContext();
            asyncContextAccessor._$PINPOINT$_setAsyncContext(asyncContext);
        }
    }

and it make cut off tracing between consumer thread with kotlin-coroutine context.
btw, i implemented simple thread safety code.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
2 participants