Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Can we trace the standlone thrift RPC server? #2106

Closed
duwupeng opened this issue Sep 26, 2016 · 14 comments
Closed

Can we trace the standlone thrift RPC server? #2106

duwupeng opened this issue Sep 26, 2016 · 14 comments

Comments

@duwupeng
Copy link

duwupeng commented Sep 26, 2016

Hi All,

First of all, Thank you so much for the lovely tool. I have gone through it and found really interesting.

I have checked this by configuring agent on Tomcat and it's showing more than I expected.

Now, The question is :- Can we trace the thrift RPC client and server?

I look forward for your response.

Cheers,
duwupeng

2016-09-26 17:49:03 [DEBUG](c.n.p.b.i.s.ScopedInterceptor :298) tryBefore() returns false: interceptorScopeTransaction: InterceptorScopeInvocation(ThriftClientScope)[depth=0], executionPoint: INTERNAL. Skip interceptor class com.navercorp.pinpoint.plugin.thrift.interceptor.tprotocol.client.TProtocolWriteFieldStopInterceptor
2016-09-26 17:49:03 [DEBUG](c.n.p.b.i.s.ScopedInterceptor :298) tryAfter() returns false: interceptorScopeTransaction: InterceptorScopeInvocation(ThriftClientScope)[depth=0], executionPoint: INTERNAL. Skip interceptor class com.navercorp.pinpoint.plugin.thrift.interceptor.tprotocol.client.TProtocolWriteFieldStopInterceptor

@duwupeng
Copy link
Author

duwupeng commented Sep 26, 2016

The thrift plugin has been enabled , while it can not trace the data.

profiler.thrift.client=true
profiler.thrift.client.async=true
profiler.thrift.processor=true
profiler.thrift.processor.async=true
profiler.thrift.service.args=true
profiler.thrift.service.result=true

@duwupeng duwupeng changed the title Why the thrift plugin can not trace the Can we trace the standlone thrift RPC server? Sep 26, 2016
@Xylus
Copy link
Contributor

Xylus commented Sep 26, 2016

Hi duwupeng,
Could you give a short explanation on how you defined your processors? Are you using the default processor auto-generated from the IDL?

@duwupeng
Copy link
Author

duwupeng commented Sep 26, 2016

Hell Xylus ,
Thanks for four reply。
As you said , we defined the processors using the thrift Compiler (0.9.2) from IDL, and the services has been brought up by the TThreadedSelectorServer in a java main process.

@Xylus
Copy link
Contributor

Xylus commented Sep 26, 2016

Hmm..that's weird, will definitely need to look into it further.
Could you post the agent log please? It should be under '$AGENT_DIR/logs' directory.
Thanks!

@duwupeng
Copy link
Author

duwupeng commented Sep 26, 2016

sure. The the agent log and processor are as below.

1. The agent log:

2016-09-26 18:55:56 [DEBUG](c.n.p.p.t.i.s.ProcessFunctionProcessInterceptor:96 ) BEFORE com.kuaisu.infra.thrift.service.HeartBeatService$Processor$hello  args:(1, StoredMessageProtocol, TCompactProtocol, HeartBeatServiceImpl)
2016-09-26 18:55:56 [DEBUG](c.n.p.p.t.i.s.ProcessFunctionProcessInterceptor:288) Invalid target object. Need field accessor(com.navercorp.pinpoint.plugin.thrift.field.accessor.ServerMarkerFlagFieldAccessor).
2016-09-26 18:55:56 [DEBUG](c.n.p.p.t.i.t.s.TProtocolReadFieldBeginInterceptor:121) AFTER org.apache.thrift.protocol.TCompactProtocol  args:() result:TField
2016-09-26 18:55:56 [DEBUG](c.n.p.p.t.i.t.s.TProtocolReadMessageEndInterceptor:121) AFTER org.apache.thrift.protocol.TCompactProtocol  args:() result:null
2016-09-26 18:55:56 [DEBUG](c.n.p.b.i.s.ScopedInterceptor      :298) tryBefore() returns false: interceptorScopeTransaction: InterceptorScopeInvocation(ThriftClientScope)[depth=0], executionPoint: INTERNAL. Skip interceptor class com.navercorp.pinpoint.plugin.thrift.interceptor.tprotocol.client.TProtocolWriteFieldStopInterceptor
2016-09-26 18:55:56 [DEBUG](c.n.p.b.i.s.ScopedInterceptor      :298) tryAfter() returns false: interceptorScopeTransaction: InterceptorScopeInvocation(ThriftClientScope)[depth=0], executionPoint: INTERNAL. Skip interceptor class com.navercorp.pinpoint.plugin.thrift.interceptor.tprotocol.client.TProtocolWriteFieldStopInterceptor
2016-09-26 18:55:56 [DEBUG](c.n.p.p.t.i.s.ProcessFunctionProcessInterceptor:121) AFTER com.kuaisu.infra.thrift.service.HeartBeatService$Processor$hello  args:(1, StoredMessageProtocol, TCompactProtocol, HeartBeatServiceImpl) result:null
2016-09-26 18:55:56 [DEBUG](c.n.p.b.i.s.ScopedInterceptor      :298) tryBefore() returns false: interceptorScopeTransaction: InterceptorScopeInvocation(ThriftServerScope)[depth=0], executionPoint: INTERNAL. Skip interceptor class com.navercorp.pinpoint.plugin.thrift.interceptor.tprotocol.server.TProtocolReadMessageBeginInterceptor
2016-09-26 18:55:56 [DEBUG](c.n.p.b.i.s.ScopedInterceptor      :298) tryAfter() returns false: interceptorScopeTransaction: InterceptorScopeInvocation(ThriftServerScope)[depth=0], executionPoint: INTERNAL. Skip interceptor class com.navercorp.pinpoint.plugin.thrift.interceptor.tprotocol.server.TProtocolReadMessageBeginInterceptor

2. The processor :
/**

  • Autogenerated by Thrift Compiler (0.9.2)
    *
  • DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
  • @generated
    */
    package com.kuaisu.infra.thrift.service;

import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Generated;
import java.util.*;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-27")
public class HeartBeatService {

/**

  • Common interface
    */
    public interface Iface {

    /**

    • Heart Beart
      */
      public void hello() throws org.apache.thrift.TException;

    }

    public interface AsyncIface {

    public void hello(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;

    }

    public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    public static class Factory implements org.apache.thrift.TServiceClientFactory {
    public Factory() {}
    public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
    return new Client(prot);
    }
    public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
    return new Client(iprot, oprot);
    }
    }
    ...

@Xylus
Copy link
Contributor

Xylus commented Sep 27, 2016

@duwupeng It looks like you're using TTupleProtocol, and unfortunately I don't think the Thrift plugin supports tracing it (I think only the TBinaryProtocol, TCompactProtocol, and TJSONProtocol are supported currently). There probably was a reason why I left TTupleProtocol out but I don't quite remember exactly why as it's been a while since I developed it. I'll look into this so that I can give you a better answer.

In the mean time, you could try adding TTupleProtocol support yourself if you're building from source (master).
Simply add addTProtocolInterceptors(config, "org.apache.thrift.protocol.TTupleProtocol"); to ThriftPlugin.java and build the plugins and agent module. Then use this agent and see if it works.

@duwupeng
Copy link
Author

duwupeng commented Sep 27, 2016

@Xylus , Thanks a lot for you investigation . and I am afraid that I am using the TCompactProtocol
The following is my app server .

import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AppServer {
    protected static ClassPathXmlApplicationContext context;
    public static void main(String[] args) {
        String logDir = System.getProperty("log.dir");
        if (logDir == null) {
            System.setProperty("log.dir", ".");
        }
        final Logger logger = LoggerFactory.getLogger(AppServer.class);
        PropertyConfigurator.configure(AppServer.class.getClassLoader().getResource("logback.xml"));
        context = new ClassPathXmlApplicationContext("spring/root.xml");
        SpringInstanceProvider provider = new SpringInstanceProvider(context);
        InstanceFactory.setInstanceProvider(provider);

        Integer port = (Integer) context.getBean("port");
        System.setProperty("serverPort", String.valueOf(port));

        logger.info("serverPort:{}", port);

        try {
            TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);

            TTransportFactory transportFactory = new TFramedTransport.Factory();

            TProtocolFactory proFactory = new TCompactProtocol.Factory();

            TMultiplexedProcessor processor = new TMultiplexedProcessor();

            String[] strings = context.getBeanNamesForType(org.apache.thrift.TProcessor.class);
            for (String beanName : strings) {
                TProcessor bean = (TProcessor) context.getBean(beanName);
                if (bean == null) {
                    logger.warn(">>>>{}<<<<Register failed", beanName);
                    continue;
                }
                beanName = bean.toString().split("\\$")[0];
                beanName = beanName.substring(beanName.lastIndexOf(".") + 1);

                logger.info("registerProcessor:{}", beanName);
                processor.registerProcessor(beanName, bean);
            }

            int corePoolSize = 100;
            int maximumPoolSize = 200;
            long keepAliveTime = 60;
            TimeUnit unit = TimeUnit.SECONDS;
            BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50);
            ThreadFactory threadFactory = new DefaultThreadFactory();

            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

            TThreadedSelectorServer.Args args1 = new TThreadedSelectorServer.Args(serverTransport)
                    .protocolFactory(proFactory)
                    .transportFactory(transportFactory)
                    .processor(processor)
                    //.workerThreads(35)
                    //.acceptQueueSizePerThread(30)
                    .executorService(poolExecutor);

            args1.maxReadBufferBytes = 1024 * 1024 * 5; // 5M

            //Create the server
            TServer server = new TThreadedSelectorServer(args1);

            logger.info("Start server on port {}...", port);
            RunModeHelper.initStart();
            server.serve();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
    static class DefaultThreadFactory implements ThreadFactory {
        final AtomicInteger threadNumber = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "ks-trans-" + threadNumber.getAndIncrement());
        }
    }
}

@Xylus
Copy link
Contributor

Xylus commented Sep 27, 2016

@duwupeng Ah I see! Thanks for the code example.
I'll test it out and figure out why it's not being traced.

@duwupeng
Copy link
Author

duwupeng commented Sep 27, 2016

@Xylus , Thanks a lot, looking forward to your further reply
And I would like to provide the URL that wish you can accesss.
http://120.25.79.51:10128/stat.html.
I am with the trouble of tracing the sandbox4-ksservices which is the thrift Server.

@Xylus
Copy link
Contributor

Xylus commented Sep 28, 2016

@duwupeng
Looks like it's because we don't have support for TMultiplexedProcessor yet.
We'll take a look at adding support for TMultiplexedProcessor but it'll take some time.

If you can, could you try testing it by registering a TProcessor object directly to the server and see if that works?

@Xylus
Copy link
Contributor

Xylus commented Sep 28, 2016

@duwupeng So upon further inspection, the actual culprit of all this was that when using TMultiplexedProcessor, the actual input protocol is wrapped by TMultiplexedProcessor$StoredMessageProtocol.
So when ProcessFunction.process() is invoked, the interceptor injects the server marker into this decorated object and not the actual input protocol, stopping any trace from being generated.

You can test this out once #2110 is merged if you're building from source.
Let me know if anything looks weird.

Thanks!

@Xylus
Copy link
Contributor

Xylus commented Oct 21, 2016

@duwupeng With the latest source, you should now be able to trace your application.
Please reopen the issue if you have more questions.

@Xylus Xylus closed this as completed Oct 21, 2016
@DevotedTangLiu
Copy link

Yep I have tried 1.6.0, it worked.

@Xylus
Copy link
Contributor

Xylus commented Dec 1, 2016

@DevotedTangLiu Awesome, thanks for testing this out! Please let us know if there's anything broken.

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

3 participants