Skip to content

Commit

Permalink
[#9030] Backport: update thrift plugin for 0.14 or later
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Jul 11, 2022
1 parent 3a14ae8 commit 49f5b02
Show file tree
Hide file tree
Showing 28 changed files with 1,069 additions and 75 deletions.
1 change: 0 additions & 1 deletion plugins-it/thrift-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class AsyncEchoTestClient implements EchoTestClient {
private final EchoService.AsyncClient asyncClient;
private final TAsyncClientManager asyncClientManager = new TAsyncClientManager();

private AsyncEchoTestClient(TestEnvironment environment) throws IOException {
private AsyncEchoTestClient(TestEnvironment environment) throws Exception {
this.environment = environment;
this.transport = new TNonblockingSocket(this.environment.getServerIp(), this.environment.getPort());
this.asyncClient = new EchoService.AsyncClient(this.environment.getProtocolFactory(), this.asyncClientManager, this.transport);
Expand Down Expand Up @@ -168,7 +168,7 @@ public void onError(Exception exception) {
}

public static class Client extends AsyncEchoTestClient {
public Client(TestEnvironment environment) throws IOException {
public Client(TestEnvironment environment) throws Exception {
super(environment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
Expand Down Expand Up @@ -106,8 +105,8 @@ public Client(TestEnvironment environment) throws TTransportException {
}

public static class ClientForNonblockingServer extends SyncEchoTestClient {
public ClientForNonblockingServer(TestEnvironment environment) throws TTransportException {
super(environment, new TFramedTransport(new TSocket(environment.getServerIp(), environment.getPort())));
public ClientForNonblockingServer(TestEnvironment environment) throws Exception {
super(environment, TTransportInstanceCreator.create(SyncEchoTestClient014.class.getClassLoader(), "org.apache.thrift.transport.TFramedTransport", new TSocket(environment.getServerIp(), environment.getPort())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.thrift.common.client;

import com.navercorp.pinpoint.bootstrap.plugin.test.Expectations;
import com.navercorp.pinpoint.bootstrap.plugin.test.ExpectedAnnotation;
import com.navercorp.pinpoint.bootstrap.plugin.test.PluginTestVerifier;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.plugin.thrift.common.TestEnvironment;
import com.navercorp.pinpoint.plugin.thrift.dto.EchoService;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;

import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.event;

/**
* @author HyunGil Jeong
*/
public abstract class SyncEchoTestClient014 implements EchoTestClient {

private final TestEnvironment environment;
private final TTransport transport;

private SyncEchoTestClient014(TestEnvironment environment, TTransport transport) throws TTransportException {
this.environment = environment;
this.transport = transport;
this.transport.open();
}

@Override
public final String echo(String message) throws TException {
TProtocol protocol = this.environment.getProtocolFactory().getProtocol(transport);
EchoService.Client client = new EchoService.Client(protocol);
return client.echo(message);
}

@Override
public void verifyTraces(PluginTestVerifier verifier, String expectedMessage) throws Exception {
// refer to TServiceClientSendBaseInterceptor.getRemoteAddress(...)
final InetSocketAddress socketAddress = this.environment.getServerAddress();
final String hostName = SocketAddressUtils.getHostNameFirst(socketAddress);
final String remoteAddress = HostAndPort.toHostAndPortString(hostName, socketAddress.getPort());

// SpanEvent - TServiceClient.sendBase
Method sendBase = TServiceClient.class.getDeclaredMethod("sendBase", String.class, TBase.class);

ExpectedAnnotation thriftUrl = Expectations.annotation("thrift.url",
remoteAddress + "/com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo");
ExpectedAnnotation thriftArgs = Expectations.annotation("thrift.args",
"echo_args(message:" + expectedMessage + ")");

// SpanEvent - TServiceClient.receiveBase
Method receiveBase = TServiceClient.class.getDeclaredMethod("receiveBase", TBase.class, String.class);
ExpectedAnnotation thriftResult = Expectations.annotation("thrift.result", "echo_result(success:"
+ expectedMessage + ")");

verifier.verifyDiscreteTrace(event("THRIFT_CLIENT", // ServiceType
sendBase, // Method
null, // rpc
null, // endPoint
remoteAddress, // destinationId
thriftUrl, // Annotation("thrift.url")
thriftArgs), // Annotation("thrift.args")
event("THRIFT_CLIENT_INTERNAL", // ServiceType
receiveBase, // Method
thriftResult // Annotation("thrift.result")
));
}

@Override
public void close() {
if (this.transport.isOpen()) {
this.transport.close();
}
}

public static class Client extends SyncEchoTestClient014 {
public Client(TestEnvironment environment) throws TTransportException {
super(environment, new TSocket(environment.getServerIp(), environment.getPort()));
}
}

public static class ClientForNonblockingServer extends SyncEchoTestClient014 {
public ClientForNonblockingServer(TestEnvironment environment) throws Exception {
super(environment, TTransportInstanceCreator.create(SyncEchoTestClient014.class.getClassLoader(), "org.apache.thrift.transport.layered.TFramedTransport", new TSocket(environment.getServerIp(), environment.getPort())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.thrift.common.client;

import org.apache.thrift.transport.TTransport;

import java.lang.reflect.Constructor;

public class TTransportInstanceCreator {

public static TTransport create(ClassLoader classLoader, String className, TTransport transport) throws Exception {
Class<?> clazz = classLoader.loadClass(className);
Constructor<?> constructor = clazz.getConstructor(TTransport.class);
return (TTransport) constructor.newInstance(transport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public static AsyncEchoTestServer<TThreadedSelectorServer> threadedSelectorServe
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer<TThreadedSelectorServer>(server, environment) {
@Override
public SyncEchoTestClient getSynchronousClient() throws TTransportException {
public SyncEchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient.ClientForNonblockingServer(environment);
}

@Override
public AsyncEchoTestClient getAsynchronousClient() throws IOException {
public AsyncEchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
Expand All @@ -108,12 +108,12 @@ public static AsyncEchoTestServer<TNonblockingServer> nonblockingServer(final Te
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer<TNonblockingServer>(server, environment) {
@Override
public SyncEchoTestClient getSynchronousClient() throws TTransportException {
public SyncEchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient.ClientForNonblockingServer(environment);
}

@Override
public AsyncEchoTestClient getAsynchronousClient() throws IOException {
public AsyncEchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
Expand All @@ -127,12 +127,12 @@ public static AsyncEchoTestServer<THsHaServer> halfSyncHalfAsyncServer(final Tes
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer<THsHaServer>(server, environment) {
@Override
public SyncEchoTestClient getSynchronousClient() throws TTransportException {
public SyncEchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient.ClientForNonblockingServer(environment);
}

@Override
public AsyncEchoTestClient getAsynchronousClient() throws IOException {
public AsyncEchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.thrift.common.server;

import com.navercorp.pinpoint.bootstrap.plugin.test.PluginTestVerifier;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.plugin.thrift.common.TestEnvironment;
import com.navercorp.pinpoint.plugin.thrift.common.client.AsyncEchoTestClient;
import com.navercorp.pinpoint.plugin.thrift.common.client.EchoTestClient;
import com.navercorp.pinpoint.plugin.thrift.common.client.SyncEchoTestClient;
import com.navercorp.pinpoint.plugin.thrift.common.client.SyncEchoTestClient014;
import com.navercorp.pinpoint.plugin.thrift.dto.EchoService;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;

import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.annotation;
import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.event;
import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.root;

/**
* @author HyunGil Jeong
*/
public abstract class AsyncEchoTestServer014<T extends AbstractNonblockingServer> extends ThriftEchoTestServer<T> {

protected AsyncEchoTestServer014(T server, TestEnvironment environment) {
super(server, environment);
}

@Override
public void verifyServerTraces(PluginTestVerifier verifier) throws Exception {
final InetSocketAddress socketAddress = super.environment.getServerAddress();
final String address = SocketAddressUtils.getAddressFirst(socketAddress);
verifier.verifyTraceCount(2);
// Method process = TBaseAsyncProcessor.class.getDeclaredMethod("process", AsyncFrameBuffer.class);
Method process = TBinaryProtocol.class.getDeclaredMethod("readMessageEnd");
// RootSpan
verifier.verifyTrace(root("THRIFT_SERVER", // ServiceType,
"Thrift Server Invocation", // Method
"com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo", // rpc
HostAndPort.toHostAndPortString(address, socketAddress.getPort()), // endPoint
address // remoteAddress
));
// SpanEvent - TBaseAsyncProcessor.process
verifier.verifyTrace(event("THRIFT_SERVER_INTERNAL", process, annotation("thrift.url", "com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo")));
}

public static class AsyncEchoTestServerFactory {

private static TProcessor getAsyncProcessor() {
return new EchoService.AsyncProcessor<EchoService.AsyncIface>(new EchoService.AsyncIface() {
@Override
public void echo(String message, AsyncMethodCallback<String> resultHandler) throws TException {
resultHandler.onComplete(message);
}
});
}

public static AsyncEchoTestServer014<TThreadedSelectorServer> threadedSelectorServer(
final TestEnvironment environment) throws TTransportException {
TThreadedSelectorServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(
new TNonblockingServerSocket(environment.getPort())).processor(getAsyncProcessor())
.inputProtocolFactory(environment.getProtocolFactory())
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer014<TThreadedSelectorServer>(server, environment) {
@Override
public EchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient014.ClientForNonblockingServer(environment);
}

@Override
public EchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
}

public static AsyncEchoTestServer014<TNonblockingServer> nonblockingServer(final TestEnvironment environment)
throws TTransportException {
TNonblockingServer server = new TNonblockingServer(new TNonblockingServer.Args(
new TNonblockingServerSocket(environment.getPort())).processor(getAsyncProcessor())
.inputProtocolFactory(environment.getProtocolFactory())
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer014<TNonblockingServer>(server, environment) {
@Override
public EchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient014.ClientForNonblockingServer(environment);
}

@Override
public EchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
}

public static AsyncEchoTestServer014<THsHaServer> halfSyncHalfAsyncServer(final TestEnvironment environment)
throws TTransportException {
THsHaServer server = new THsHaServer(new THsHaServer.Args(new TNonblockingServerSocket(
environment.getPort())).processor(getAsyncProcessor())
.inputProtocolFactory(environment.getProtocolFactory())
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer014<THsHaServer>(server, environment) {
@Override
public EchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient014.ClientForNonblockingServer(environment);
}

@Override
public EchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
}
}

}
Loading

0 comments on commit 49f5b02

Please # to comment.