Skip to content

Commit

Permalink
Merge pull request #1274 from tuohai666/dev
Browse files Browse the repository at this point in the history
#1172, for proxy
  • Loading branch information
terrymanu authored Sep 18, 2018
2 parents dfcd1cc + b0497b5 commit 1a52f54
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
18 changes: 15 additions & 3 deletions sharding-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
<artifactId>sharding-transaction</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.shardingsphere</groupId>
<artifactId>sharding-opentracing</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -45,7 +49,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand All @@ -60,7 +63,6 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -77,6 +79,16 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-opentracing</artifactId>
<version>5.0.0-RC2</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>5.0.0-RC2</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.shardingsphere.core.yaml.YamlRuleConfiguration;
import io.shardingsphere.core.yaml.other.YamlServerConfiguration;
import io.shardingsphere.jdbc.orchestration.internal.OrchestrationFacade;
import io.shardingsphere.opentracing.ShardingTracer;
import io.shardingsphere.proxy.config.ProxyContext;
import io.shardingsphere.proxy.config.yaml.ProxyConfiguration;
import io.shardingsphere.proxy.config.yaml.ProxyYamlConfigurationLoader;
Expand All @@ -30,6 +31,7 @@
import io.shardingsphere.proxy.listener.ProxyListenerRegister;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.skywalking.apm.toolkit.opentracing.SkywalkingTracer;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -56,6 +58,7 @@ public final class Bootstrap {
* @throws IOException IO exception
*/
public static void main(final String[] args) throws InterruptedException, IOException {
ShardingTracer.init(new SkywalkingTracer());
ProxyConfiguration proxyConfig = new ProxyYamlConfigurationLoader().load();
int port = getPort(args);
new ProxyListenerRegister().register();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

package io.shardingsphere.proxy.backend.jdbc.connection;

import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.connection.CloseConnectionEvent;
import io.shardingsphere.core.event.connection.CloseConnectionFinishEvent;
import io.shardingsphere.core.event.connection.CloseConnectionStartEvent;
import io.shardingsphere.core.event.connection.GetConnectionEvent;
import io.shardingsphere.core.event.connection.GetConnectionFinishEvent;
import io.shardingsphere.core.event.connection.GetConnectionStartEvent;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaDataFactory;
import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager;
import io.shardingsphere.proxy.config.RuleRegistry;
import lombok.Getter;
Expand Down Expand Up @@ -56,9 +65,22 @@ public final class BackendConnection implements AutoCloseable {
* @throws SQLException SQL exception
*/
public Connection getConnection(final String dataSourceName) throws SQLException {
Connection result = ruleRegistry.getBackendDataSource().getConnection(dataSourceName);
cachedConnections.add(result);
return result;
try {
ShardingEventBusInstance.getInstance().post(new GetConnectionStartEvent(dataSourceName));
Connection result = ruleRegistry.getBackendDataSource().getConnection(dataSourceName);
cachedConnections.add(result);
GetConnectionEvent finishEvent = new GetConnectionFinishEvent(DataSourceMetaDataFactory.newInstance(DatabaseType.MySQL, result.getMetaData().getURL()));
finishEvent.setExecuteSuccess();
ShardingEventBusInstance.getInstance().post(finishEvent);
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
GetConnectionEvent finishEvent = new GetConnectionFinishEvent(null);
finishEvent.setExecuteFailure(ex);
ShardingEventBusInstance.getInstance().post(finishEvent);
throw ex;
}
}

/**
Expand Down Expand Up @@ -127,11 +149,17 @@ private Collection<SQLException> closeStatements() {

private Collection<SQLException> closeConnections() {
Collection<SQLException> result = new LinkedList<>();
CloseConnectionEvent finishEvent = new CloseConnectionFinishEvent();
for (Connection each : cachedConnections) {
try {
ShardingEventBusInstance.getInstance().post(new CloseConnectionStartEvent(each.getCatalog(), DataSourceMetaDataFactory.newInstance(DatabaseType.MySQL, each.getMetaData().getURL())));
each.close();
finishEvent.setExecuteSuccess();
ShardingEventBusInstance.getInstance().post(finishEvent);
} catch (SQLException ex) {
finishEvent.setExecuteFailure(ex);
result.add(ex);
ShardingEventBusInstance.getInstance().post(finishEvent);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.root.RootInvokeEvent;
import io.shardingsphere.proxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.proxy.config.ProxyContext;
import io.shardingsphere.proxy.frontend.common.FrontendHandler;
Expand Down Expand Up @@ -112,6 +114,7 @@ class CommandExecutor implements Runnable {

@Override
public void run() {
ShardingEventBusInstance.getInstance().post(new RootInvokeEvent());
try (MySQLPacketPayload payload = new MySQLPacketPayload(message);
BackendConnection backendConnection = new BackendConnection(ProxyContext.getInstance().getRuleRegistry(frontendHandler.getCurrentSchema()))) {
setBackendConnection(backendConnection);
Expand All @@ -133,6 +136,9 @@ public void run() {
// CHECKSTYLE:ON
context.writeAndFlush(new ErrPacket(1, ServerErrorCode.ER_STD_UNKNOWN_EXCEPTION, ex.getMessage()));
}
RootInvokeEvent finishEvent = new RootInvokeEvent();
finishEvent.setExecuteSuccess();
ShardingEventBusInstance.getInstance().post(finishEvent);
}

private CommandPacket getCommandPacket(final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException {
Expand Down

0 comments on commit 1a52f54

Please # to comment.