Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[#9030] Backport: update thrift plugin for 0.14 or later #9032

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion plugins-it/thrift-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class AsyncEchoTestClient implements EchoTestClient {
private final EchoService.AsyncClient asyncClient;
private final TAsyncClientManager asyncClientManager = new TAsyncClientManager();

private AsyncEchoTestClient(TestEnvironment environment) throws IOException {
private AsyncEchoTestClient(TestEnvironment environment) throws Exception {
this.environment = environment;
this.transport = new TNonblockingSocket(this.environment.getServerIp(), this.environment.getPort());
this.asyncClient = new EchoService.AsyncClient(this.environment.getProtocolFactory(), this.asyncClientManager, this.transport);
Expand Down Expand Up @@ -168,7 +168,7 @@ public void onError(Exception exception) {
}

public static class Client extends AsyncEchoTestClient {
public Client(TestEnvironment environment) throws IOException {
public Client(TestEnvironment environment) throws Exception {
super(environment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
Expand Down Expand Up @@ -106,8 +105,8 @@ public Client(TestEnvironment environment) throws TTransportException {
}

public static class ClientForNonblockingServer extends SyncEchoTestClient {
public ClientForNonblockingServer(TestEnvironment environment) throws TTransportException {
super(environment, new TFramedTransport(new TSocket(environment.getServerIp(), environment.getPort())));
public ClientForNonblockingServer(TestEnvironment environment) throws Exception {
super(environment, TTransportInstanceCreator.create(SyncEchoTestClient014.class.getClassLoader(), "org.apache.thrift.transport.TFramedTransport", new TSocket(environment.getServerIp(), environment.getPort())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.thrift.common.client;

import com.navercorp.pinpoint.bootstrap.plugin.test.Expectations;
import com.navercorp.pinpoint.bootstrap.plugin.test.ExpectedAnnotation;
import com.navercorp.pinpoint.bootstrap.plugin.test.PluginTestVerifier;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.plugin.thrift.common.TestEnvironment;
import com.navercorp.pinpoint.plugin.thrift.dto.EchoService;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;

import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.event;

/**
* @author HyunGil Jeong
*/
public abstract class SyncEchoTestClient014 implements EchoTestClient {

private final TestEnvironment environment;
private final TTransport transport;

private SyncEchoTestClient014(TestEnvironment environment, TTransport transport) throws TTransportException {
this.environment = environment;
this.transport = transport;
this.transport.open();
}

@Override
public final String echo(String message) throws TException {
TProtocol protocol = this.environment.getProtocolFactory().getProtocol(transport);
EchoService.Client client = new EchoService.Client(protocol);
return client.echo(message);
}

@Override
public void verifyTraces(PluginTestVerifier verifier, String expectedMessage) throws Exception {
// refer to TServiceClientSendBaseInterceptor.getRemoteAddress(...)
final InetSocketAddress socketAddress = this.environment.getServerAddress();
final String hostName = SocketAddressUtils.getHostNameFirst(socketAddress);
final String remoteAddress = HostAndPort.toHostAndPortString(hostName, socketAddress.getPort());

// SpanEvent - TServiceClient.sendBase
Method sendBase = TServiceClient.class.getDeclaredMethod("sendBase", String.class, TBase.class);

ExpectedAnnotation thriftUrl = Expectations.annotation("thrift.url",
remoteAddress + "/com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo");
ExpectedAnnotation thriftArgs = Expectations.annotation("thrift.args",
"echo_args(message:" + expectedMessage + ")");

// SpanEvent - TServiceClient.receiveBase
Method receiveBase = TServiceClient.class.getDeclaredMethod("receiveBase", TBase.class, String.class);
ExpectedAnnotation thriftResult = Expectations.annotation("thrift.result", "echo_result(success:"
+ expectedMessage + ")");

verifier.verifyDiscreteTrace(event("THRIFT_CLIENT", // ServiceType
sendBase, // Method
null, // rpc
null, // endPoint
remoteAddress, // destinationId
thriftUrl, // Annotation("thrift.url")
thriftArgs), // Annotation("thrift.args")
event("THRIFT_CLIENT_INTERNAL", // ServiceType
receiveBase, // Method
thriftResult // Annotation("thrift.result")
));
}

@Override
public void close() {
if (this.transport.isOpen()) {
this.transport.close();
}
}

public static class Client extends SyncEchoTestClient014 {
public Client(TestEnvironment environment) throws TTransportException {
super(environment, new TSocket(environment.getServerIp(), environment.getPort()));
}
}

public static class ClientForNonblockingServer extends SyncEchoTestClient014 {
public ClientForNonblockingServer(TestEnvironment environment) throws Exception {
super(environment, TTransportInstanceCreator.create(SyncEchoTestClient014.class.getClassLoader(), "org.apache.thrift.transport.layered.TFramedTransport", new TSocket(environment.getServerIp(), environment.getPort())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.thrift.common.client;

import org.apache.thrift.transport.TTransport;

import java.lang.reflect.Constructor;

public class TTransportInstanceCreator {

public static TTransport create(ClassLoader classLoader, String className, TTransport transport) throws Exception {
Class<?> clazz = classLoader.loadClass(className);
Constructor<?> constructor = clazz.getConstructor(TTransport.class);
return (TTransport) constructor.newInstance(transport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public static AsyncEchoTestServer<TThreadedSelectorServer> threadedSelectorServe
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer<TThreadedSelectorServer>(server, environment) {
@Override
public SyncEchoTestClient getSynchronousClient() throws TTransportException {
public SyncEchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient.ClientForNonblockingServer(environment);
}

@Override
public AsyncEchoTestClient getAsynchronousClient() throws IOException {
public AsyncEchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
Expand All @@ -108,12 +108,12 @@ public static AsyncEchoTestServer<TNonblockingServer> nonblockingServer(final Te
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer<TNonblockingServer>(server, environment) {
@Override
public SyncEchoTestClient getSynchronousClient() throws TTransportException {
public SyncEchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient.ClientForNonblockingServer(environment);
}

@Override
public AsyncEchoTestClient getAsynchronousClient() throws IOException {
public AsyncEchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
Expand All @@ -127,12 +127,12 @@ public static AsyncEchoTestServer<THsHaServer> halfSyncHalfAsyncServer(final Tes
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer<THsHaServer>(server, environment) {
@Override
public SyncEchoTestClient getSynchronousClient() throws TTransportException {
public SyncEchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient.ClientForNonblockingServer(environment);
}

@Override
public AsyncEchoTestClient getAsynchronousClient() throws IOException {
public AsyncEchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.thrift.common.server;

import com.navercorp.pinpoint.bootstrap.plugin.test.PluginTestVerifier;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.plugin.thrift.common.TestEnvironment;
import com.navercorp.pinpoint.plugin.thrift.common.client.AsyncEchoTestClient;
import com.navercorp.pinpoint.plugin.thrift.common.client.EchoTestClient;
import com.navercorp.pinpoint.plugin.thrift.common.client.SyncEchoTestClient;
import com.navercorp.pinpoint.plugin.thrift.common.client.SyncEchoTestClient014;
import com.navercorp.pinpoint.plugin.thrift.dto.EchoService;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;

import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.annotation;
import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.event;
import static com.navercorp.pinpoint.bootstrap.plugin.test.Expectations.root;

/**
* @author HyunGil Jeong
*/
public abstract class AsyncEchoTestServer014<T extends AbstractNonblockingServer> extends ThriftEchoTestServer<T> {

protected AsyncEchoTestServer014(T server, TestEnvironment environment) {
super(server, environment);
}

@Override
public void verifyServerTraces(PluginTestVerifier verifier) throws Exception {
final InetSocketAddress socketAddress = super.environment.getServerAddress();
final String address = SocketAddressUtils.getAddressFirst(socketAddress);
verifier.verifyTraceCount(2);
// Method process = TBaseAsyncProcessor.class.getDeclaredMethod("process", AsyncFrameBuffer.class);
Method process = TBinaryProtocol.class.getDeclaredMethod("readMessageEnd");
// RootSpan
verifier.verifyTrace(root("THRIFT_SERVER", // ServiceType,
"Thrift Server Invocation", // Method
"com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo", // rpc
HostAndPort.toHostAndPortString(address, socketAddress.getPort()), // endPoint
address // remoteAddress
));
// SpanEvent - TBaseAsyncProcessor.process
verifier.verifyTrace(event("THRIFT_SERVER_INTERNAL", process, annotation("thrift.url", "com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo")));
}

public static class AsyncEchoTestServerFactory {

private static TProcessor getAsyncProcessor() {
return new EchoService.AsyncProcessor<EchoService.AsyncIface>(new EchoService.AsyncIface() {
@Override
public void echo(String message, AsyncMethodCallback<String> resultHandler) throws TException {
resultHandler.onComplete(message);
}
});
}

public static AsyncEchoTestServer014<TThreadedSelectorServer> threadedSelectorServer(
final TestEnvironment environment) throws TTransportException {
TThreadedSelectorServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(
new TNonblockingServerSocket(environment.getPort())).processor(getAsyncProcessor())
.inputProtocolFactory(environment.getProtocolFactory())
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer014<TThreadedSelectorServer>(server, environment) {
@Override
public EchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient014.ClientForNonblockingServer(environment);
}

@Override
public EchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
}

public static AsyncEchoTestServer014<TNonblockingServer> nonblockingServer(final TestEnvironment environment)
throws TTransportException {
TNonblockingServer server = new TNonblockingServer(new TNonblockingServer.Args(
new TNonblockingServerSocket(environment.getPort())).processor(getAsyncProcessor())
.inputProtocolFactory(environment.getProtocolFactory())
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer014<TNonblockingServer>(server, environment) {
@Override
public EchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient014.ClientForNonblockingServer(environment);
}

@Override
public EchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
}

public static AsyncEchoTestServer014<THsHaServer> halfSyncHalfAsyncServer(final TestEnvironment environment)
throws TTransportException {
THsHaServer server = new THsHaServer(new THsHaServer.Args(new TNonblockingServerSocket(
environment.getPort())).processor(getAsyncProcessor())
.inputProtocolFactory(environment.getProtocolFactory())
.outputProtocolFactory(environment.getProtocolFactory()));
return new AsyncEchoTestServer014<THsHaServer>(server, environment) {
@Override
public EchoTestClient getSynchronousClient() throws Exception {
return new SyncEchoTestClient014.ClientForNonblockingServer(environment);
}

@Override
public EchoTestClient getAsynchronousClient() throws Exception {
return new AsyncEchoTestClient.Client(environment);
}
};
}
}

}
Loading