From 991a71a914a1e9095ffa6b9bebe0e21dc699d927 Mon Sep 17 00:00:00 2001 From: "koo.taejin" Date: Fri, 2 Oct 2015 14:12:50 +0900 Subject: [PATCH] Support cluster's client and cluster's server in web and collector project.. #1017 added cluster properties when socket handshake. --- .../connection/CollectorClusterAcceptor.java | 6 +- .../connection/CollectorClusterConnector.java | 5 ++ .../client/DefaultPinpointClientHandler.java | 13 ++- .../rpc/client/PinpointClientFactory.java | 18 +++- .../rpc/client/PinpointClientHandshaker.java | 86 +++++++++++++++---- .../pinpoint/rpc/cluster/ClusterOption.java | 78 +++++++++++++++++ .../navercorp/pinpoint/rpc/cluster/Role.java | 43 ++++++++++ .../pinpoint/rpc/common/SocketStateCode.java | 1 + .../ControlHandshakeResponsePacket.java | 2 + .../rpc/server/DefaultPinpointServer.java | 47 +++++++++- .../rpc/server/PinpointServerAcceptor.java | 14 +++ .../rpc/server/PinpointServerConfig.java | 3 + .../pinpoint/rpc/util/StringUtils.java | 34 ++++++++ 13 files changed, 326 insertions(+), 24 deletions(-) create mode 100644 rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/ClusterOption.java create mode 100644 rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/Role.java create mode 100644 rpc/src/main/java/com/navercorp/pinpoint/rpc/util/StringUtils.java diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterAcceptor.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterAcceptor.java index 68f436fc6752..b6a1404619f8 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterAcceptor.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterAcceptor.java @@ -21,6 +21,8 @@ import com.navercorp.pinpoint.rpc.MessageListener; import com.navercorp.pinpoint.rpc.PinpointSocket; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; +import com.navercorp.pinpoint.rpc.cluster.Role; import com.navercorp.pinpoint.rpc.common.SocketStateCode; import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode; import com.navercorp.pinpoint.rpc.packet.PingPacket; @@ -63,7 +65,9 @@ public CollectorClusterAcceptor(CollectorClusterConnectionOption option, InetSoc public void start() { logger.info("{} initialization started.", ClassUtils.simpleClassName(this)); - PinpointServerAcceptor serverAcceptor = new PinpointServerAcceptor(); + ClusterOption clusterOption = new ClusterOption(true, option.getClusterId(), Role.ROUTER); + + PinpointServerAcceptor serverAcceptor = new PinpointServerAcceptor(clusterOption); serverAcceptor.setMessageListener(new ClusterServerMessageListener(option.getClusterId(), option.getRouteMessageHandler())); serverAcceptor.setServerStreamChannelMessageListener(option.getRouteStreamMessageHandler()); serverAcceptor.addStateChangeEventHandler(new WebClusterServerChannelStateChangeHandler()); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnector.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnector.java index 1f360180bf01..8a6bed9fbbd2 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnector.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnector.java @@ -21,6 +21,8 @@ import com.navercorp.pinpoint.rpc.PinpointSocket; import com.navercorp.pinpoint.rpc.client.PinpointClientFactory; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; +import com.navercorp.pinpoint.rpc.cluster.Role; import com.navercorp.pinpoint.rpc.util.ClassUtils; import com.navercorp.pinpoint.rpc.util.ClientFactoryUtils; import org.slf4j.Logger; @@ -47,11 +49,14 @@ public CollectorClusterConnector(CollectorClusterConnectionOption option) { public void start() { logger.info("{} initialization started.", ClassUtils.simpleClassName(this)); + ClusterOption clusterOption = new ClusterOption(true, option.getClusterId(), Role.ROUTER); + this.clientFactory = new PinpointClientFactory(); this.clientFactory.setTimeoutMillis(1000 * 5); this.clientFactory.setMessageListener(option.getRouteMessageHandler()); this.clientFactory.setServerStreamChannelMessageListener(option.getRouteStreamMessageHandler()); + this.clientFactory.setClusterOption(clusterOption); Map properties = new HashMap(); properties.put("id", option.getClusterId()); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java index 2a10196652ee..f627e419ccc9 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java @@ -18,6 +18,7 @@ import com.navercorp.pinpoint.rpc.*; import com.navercorp.pinpoint.rpc.client.ConnectFuture.Result; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; import com.navercorp.pinpoint.rpc.common.SocketStateChangeResult; import com.navercorp.pinpoint.rpc.common.SocketStateCode; import com.navercorp.pinpoint.rpc.packet.*; @@ -88,6 +89,9 @@ public class DefaultPinpointClientHandler extends SimpleChannelHandler implement private final ConnectFuture connectFuture = new ConnectFuture(); private final String objectUniqName; + + private final ClusterOption localClusterOption; + private ClusterOption remoteClusterOption = ClusterOption.DISABLE_CLUSTER_OPTION; public DefaultPinpointClientHandler(PinpointClientFactory clientFactory) { this(clientFactory, DEFAULT_PING_DELAY, DEFAULT_ENABLE_WORKER_PACKET_DELAY, DEFAULT_TIMEOUTMILLIS); @@ -116,6 +120,8 @@ public DefaultPinpointClientHandler(PinpointClientFactory clientFactory, long pi this.socketId = clientFactory.issueNewSocketId(); this.pingIdGenerator = new AtomicInteger(0); this.state = new PinpointClientHandlerState(this, clientFactory.getStateChangeEventListeners()); + + this.localClusterOption = clientFactory.getClusterOption(); } public void setPinpointClient(PinpointClient pinpointClient) { @@ -167,7 +173,11 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr Map handshakeData = new HashMap(); handshakeData.putAll(clientFactory.getProperties()); handshakeData.put("socketId", socketId); - + + if (localClusterOption.isEnable()) { + handshakeData.put("cluster", localClusterOption.getProperties()); + } + handshaker.handshakeStart(channel, handshakeData); connectFuture.setResult(Result.SUCCESS); @@ -419,6 +429,7 @@ private void handleHandshakePacket(ControlHandshakeResponsePacket message, Chann if (code == HandshakeResponseCode.SUCCESS || code == HandshakeResponseCode.ALREADY_KNOWN) { state.toRunSimplex(); } else if (code == HandshakeResponseCode.DUPLEX_COMMUNICATION || code == HandshakeResponseCode.ALREADY_DUPLEX_COMMUNICATION) { + remoteClusterOption = handshaker.getClusterOption(); state.toRunDuplex(); } else if (code == HandshakeResponseCode.SIMPLEX_COMMUNICATION || code == HandshakeResponseCode.ALREADY_SIMPLEX_COMMUNICATION) { state.toRunSimplex(); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java index c82f0406cb6e..62c327111e2a 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java @@ -27,6 +27,8 @@ import com.navercorp.pinpoint.rpc.MessageListener; import com.navercorp.pinpoint.rpc.StateChangeEventListener; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; +import com.navercorp.pinpoint.rpc.cluster.Role; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; @@ -81,7 +83,9 @@ public class PinpointClientFactory { private long pingDelay = DEFAULT_PING_DELAY; private long enableWorkerPacketDelay = DEFAULT_ENABLE_WORKER_PACKET_DELAY; private long timeoutMillis = DEFAULT_TIMEOUTMILLIS; - + + private ClusterOption clusterOption = ClusterOption.DISABLE_CLUSTER_OPTION; + private MessageListener messageListener = SimpleLoggingMessageListener.LISTENER; private List stateChangeEventListeners = new ArrayList(); private ServerStreamChannelMessageListener serverStreamChannelMessageListener = DisabledServerStreamChannelMessageListener.INSTANCE; @@ -407,6 +411,18 @@ public void setProperties(Map agentProperties) { this.properties = Collections.unmodifiableMap(agentProperties); } + public ClusterOption getClusterOption() { + return clusterOption; + } + + public void setClusterOption(String id, List roles) { + this.clusterOption = new ClusterOption(true, id, roles); + } + + public void setClusterOption(ClusterOption clusterOption) { + this.clusterOption = clusterOption; + } + public MessageListener getMessageListener() { return messageListener; } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandshaker.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandshaker.java index 2f3397d51f92..c131b8b78424 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandshaker.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandshaker.java @@ -16,11 +16,17 @@ package com.navercorp.pinpoint.rpc.client; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; +import com.navercorp.pinpoint.rpc.cluster.Role; +import com.navercorp.pinpoint.rpc.util.*; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; @@ -34,10 +40,6 @@ import com.navercorp.pinpoint.rpc.packet.ControlHandshakePacket; import com.navercorp.pinpoint.rpc.packet.ControlHandshakeResponsePacket; import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode; -import com.navercorp.pinpoint.rpc.util.AssertUtils; -import com.navercorp.pinpoint.rpc.util.ClassUtils; -import com.navercorp.pinpoint.rpc.util.ControlMessageEncodingUtils; -import com.navercorp.pinpoint.rpc.util.MapUtils; public class PinpointClientHandshaker { @@ -59,6 +61,7 @@ public class PinpointClientHandshaker { private final Object lock = new Object(); private final AtomicReference handshakeResult = new AtomicReference(null); + private final AtomicReference clusterOption = new AtomicReference(null); private String simpleName; @@ -153,38 +156,85 @@ public boolean handshakeComplete(ControlHandshakeResponsePacket message) { this.state.set(STATE_FINISHED); return false; } - - HandshakeResponseCode code = getHandshakeResponseCode(message); + + Map handshakeResponse = decode(message); + + HandshakeResponseCode code = getResponseCode(handshakeResponse); handshakeResult.compareAndSet(null, code); + + ClusterOption clusterOption = getClusterOption(handshakeResponse); + this.clusterOption.compareAndSet(null, clusterOption); + logger.info("{} handshakeComplete method completed. handshakeResult:{} / {}", simpleClassNameAndHashCodeString(), code, handshakeResult.get()); return true; } } - private HandshakeResponseCode getHandshakeResponseCode(ControlHandshakeResponsePacket message) { + private Map decode(ControlHandshakeResponsePacket message) { byte[] payload = message.getPayload(); if (payload == null) { - return HandshakeResponseCode.PROTOCOL_ERROR; + return Collections.EMPTY_MAP; } - + try { Map result = (Map) ControlMessageEncodingUtils.decode(payload); - - int code = MapUtils.getInteger(result, ControlHandshakeResponsePacket.CODE, -1); - int subCode = MapUtils.getInteger(result, ControlHandshakeResponsePacket.SUB_CODE, -1); - - return HandshakeResponseCode.getValue(code, subCode); + return result; } catch (ProtocolException e) { - logger.warn(e.getMessage(), e); + } - - return HandshakeResponseCode.UNKNOWN_CODE; + + return Collections.EMPTY_MAP; } - + + private HandshakeResponseCode getResponseCode(Map handshakeResponse) { + if (handshakeResponse == Collections.EMPTY_MAP) { + return HandshakeResponseCode.PROTOCOL_ERROR; + } + + int code = MapUtils.getInteger(handshakeResponse, ControlHandshakeResponsePacket.CODE, -1); + int subCode = MapUtils.getInteger(handshakeResponse, ControlHandshakeResponsePacket.SUB_CODE, -1); + + return HandshakeResponseCode.getValue(code, subCode); + } + + private ClusterOption getClusterOption(Map handshakeResponse) { + if (handshakeResponse == Collections.EMPTY_MAP) { + return ClusterOption.DISABLE_CLUSTER_OPTION; + } + + Map cluster = (Map) handshakeResponse.get(ControlHandshakeResponsePacket.CLUSTER); + if (cluster == null) { + return ClusterOption.DISABLE_CLUSTER_OPTION; + } + + String id = MapUtils.getString(cluster, "id", ""); + List roles = getRoles((List) cluster.get("roles")); + + if (StringUtils.isEmpty(id)) { + return ClusterOption.DISABLE_CLUSTER_OPTION; + } else { + return new ClusterOption(true, id, roles); + } + } + + private List getRoles(List roleNames) { + List roles = new ArrayList(); + for (Object roleName : roleNames) { + if (roleName instanceof String && !StringUtils.isEmpty((String) roleName)) { + roles.add(Role.getValue((String) roleName)); + } + } + return roles; + } + public HandshakeResponseCode getHandshakeResult() { return handshakeResult.get(); } + public ClusterOption getClusterOption() { + return clusterOption.get(); + } + public void handshakeAbort() { logger.info("{} handshakeAbort method started.", simpleClassNameAndHashCodeString()); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/ClusterOption.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/ClusterOption.java new file mode 100644 index 000000000000..66b11bf96671 --- /dev/null +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/ClusterOption.java @@ -0,0 +1,78 @@ +/* + * + * * Copyright 2014 NAVER Corp. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + * + */ + +package com.navercorp.pinpoint.rpc.cluster; + +import java.util.*; + +/** + * @Author Taejin Koo + */ +public class ClusterOption { + + public static final ClusterOption DISABLE_CLUSTER_OPTION = new ClusterOption(false, "", Collections.EMPTY_LIST); + + private final boolean enable; + private final String id; + private final List roles; + + public ClusterOption(boolean enable, String id, String role) { + this(enable, id, Role.getValue(role)); + } + + public ClusterOption(boolean enable, String id, Role role) { + this(enable, id, Arrays.asList(role)); + } + + public ClusterOption(boolean enable, String id, List roles) { + this.enable = enable; + this.id = id; + this.roles = roles; + } + + public boolean isEnable() { + return enable; + } + + public String getId() { + return id; + } + + public List getRoles() { + return roles; + } + + public Map getProperties() { + if (!enable) { + return Collections.emptyMap(); + } + + Map clusterProperties = new HashMap(2); + clusterProperties.put("id", id); + + List roleList = new ArrayList(roles.size()); + for (Role role : roles) { + roleList.add(role.name()); + } + clusterProperties.put("roles", roleList); + + return clusterProperties; + } + +} diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/Role.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/Role.java new file mode 100644 index 000000000000..11fd8059a412 --- /dev/null +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/Role.java @@ -0,0 +1,43 @@ +/* + * + * * Copyright 2014 NAVER Corp. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + * + */ + +package com.navercorp.pinpoint.rpc.cluster; + +/** + * @Author Taejin Koo + */ +public enum Role { + + CALLER, CALLEE, ROUTER, UNKNOWN; + + public static Role getValue(String name) { + if (name == null) { + return UNKNOWN; + } + + for (Role role : Role.values()) { + if (name.equals(role.name())) { + return role; + } + } + + return UNKNOWN; + } + +} diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketStateCode.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketStateCode.java index 7449f5213e99..3792b7fe6235 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketStateCode.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketStateCode.java @@ -142,6 +142,7 @@ public static boolean isClosed(SocketStateCode code) { case UNEXPECTED_CLOSE_BY_SERVER: case ERROR_UNKNOWN: case ERROR_ILLEGAL_STATE_CHANGE: + case ERROR_SYNC_STATE_SESSION: return true; default: return false; diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/packet/ControlHandshakeResponsePacket.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/packet/ControlHandshakeResponsePacket.java index 0beacc4fa511..a8c25bfec584 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/packet/ControlHandshakeResponsePacket.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/packet/ControlHandshakeResponsePacket.java @@ -26,6 +26,8 @@ public class ControlHandshakeResponsePacket extends ControlPacket { public static final String CODE = "code"; public static final String SUB_CODE = "subCode"; + + public static final String CLUSTER = "cluster"; public ControlHandshakeResponsePacket(byte[] payload) { super(payload); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java index f93f6a1c0bbc..3e38aa169355 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java @@ -21,6 +21,8 @@ import com.navercorp.pinpoint.rpc.ResponseMessage; import com.navercorp.pinpoint.rpc.client.RequestManager; import com.navercorp.pinpoint.rpc.client.WriteFailFutureListener; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; +import com.navercorp.pinpoint.rpc.cluster.Role; import com.navercorp.pinpoint.rpc.common.CyclicStateChecker; import com.navercorp.pinpoint.rpc.common.SocketStateChangeResult; import com.navercorp.pinpoint.rpc.common.SocketStateCode; @@ -66,7 +68,10 @@ public class DefaultPinpointServer implements PinpointServer { private final AtomicReference> properties = new AtomicReference>(); private final String objectUniqName; - + + private final ClusterOption localClusterOption; + private ClusterOption remoteClusterOption; + private final ChannelFutureListener serverCloseWriteListener; private final ChannelFutureListener responseWriteFailListener; @@ -108,6 +113,8 @@ public DefaultPinpointServer(Channel channel, PinpointServerConfig serverConfig, this.state = new DefaultPinpointServerState(this, this.stateChangeEventListeners); this.stateChecker = new CyclicStateChecker(5); + + this.localClusterOption = serverConfig.getClusterOption(); } public void start() { @@ -347,10 +354,11 @@ private void handleHandshake(ControlHandshakePacket handshakepacket) { boolean isFirst = setChannelProperties(handshakeData); if (isFirst) { if (HandshakeResponseCode.DUPLEX_COMMUNICATION == responseCode) { + this.remoteClusterOption = getClusterOption(handshakeData); state.toRunDuplex(); } else if (HandshakeResponseCode.SIMPLEX_COMMUNICATION == responseCode || HandshakeResponseCode.SUCCESS == responseCode) { state.toRunSimplex(); - } + } } logger.info("{} handleHandshake(). ResponseCode:{}", objectUniqName, responseCode); @@ -361,6 +369,36 @@ private void handleHandshake(ControlHandshakePacket handshakepacket) { logger.info("{} handleHandshake() completed.", objectUniqName); } + private ClusterOption getClusterOption(Map handshakeResponse) { + if (handshakeResponse == Collections.EMPTY_MAP) { + return ClusterOption.DISABLE_CLUSTER_OPTION; + } + + Map cluster = (Map) handshakeResponse.get(ControlHandshakeResponsePacket.CLUSTER); + if (cluster == null) { + return ClusterOption.DISABLE_CLUSTER_OPTION; + } + + String id = MapUtils.getString(cluster, "id", ""); + List roles = getRoles((List) cluster.get("roles")); + + if (StringUtils.isEmpty(id)) { + return ClusterOption.DISABLE_CLUSTER_OPTION; + } else { + return new ClusterOption(true, id, roles); + } + } + + private List getRoles(List roleNames) { + List roles = new ArrayList(); + for (Object roleName : roleNames) { + if (roleName instanceof String && !StringUtils.isEmpty((String) roleName)) { + roles.add(Role.getValue((String) roleName)); + } + } + return roles; + } + private void handleClosePacket(Channel channel) { logger.info("{} handleClosePacket() started.", objectUniqName); @@ -412,6 +450,9 @@ private Map createHandshakeResponse(HandshakeResponseCode respon Map result = new HashMap(); result.put(ControlHandshakeResponsePacket.CODE, createdCode.getCode()); result.put(ControlHandshakeResponsePacket.SUB_CODE, createdCode.getSubCode()); + if (localClusterOption.isEnable()) { + result.put(ControlHandshakeResponsePacket.CLUSTER, localClusterOption.getProperties()); + } return result; } @@ -436,7 +477,7 @@ private Map decodeHandshakePacket(ControlHandshakePacket message logger.warn(e.getMessage(), e); } - return null; + return Collections.EMPTY_MAP; } public boolean isEnableCommunication() { diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerAcceptor.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerAcceptor.java index fd073c848a82..971287902520 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerAcceptor.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerAcceptor.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; @@ -88,6 +89,8 @@ public class PinpointServerAcceptor implements PinpointServerConfig { private final Timer healthCheckTimer; private final Timer requestManagerTimer; + private final ClusterOption clusterOption; + private long defaultRequestTimeout = DEFAULT_TIMEOUTMILLIS; static { @@ -95,6 +98,10 @@ public class PinpointServerAcceptor implements PinpointServerConfig { } public PinpointServerAcceptor() { + this(ClusterOption.DISABLE_CLUSTER_OPTION); + } + + public PinpointServerAcceptor(ClusterOption clusterOption) { ServerBootstrap bootstrap = createBootStrap(1, WORKER_COUNT); setOptions(bootstrap); addPipeline(bootstrap); @@ -102,6 +109,8 @@ public PinpointServerAcceptor() { this.healthCheckTimer = TimerFactory.createHashedWheelTimer("PinpointServerSocket-HealthCheckTimer", 50, TimeUnit.MILLISECONDS, 512); this.requestManagerTimer = TimerFactory.createHashedWheelTimer("PinpointServerSocket-RequestManager", 50, TimeUnit.MILLISECONDS, 512); + + this.clusterOption = clusterOption; } private ServerBootstrap createBootStrap(int bossCount, int workerCount) { @@ -238,6 +247,11 @@ public Timer getRequestManagerTimer() { return requestManagerTimer; } + @Override + public ClusterOption getClusterOption() { + return clusterOption; + } + private void sendPing() { logger.debug("sendPing"); final TimerTask pintTask = new TimerTask() { diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerConfig.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerConfig.java index 9b669a7b8f5b..a99486ca77e0 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerConfig.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServerConfig.java @@ -18,6 +18,7 @@ import java.util.List; +import com.navercorp.pinpoint.rpc.cluster.ClusterOption; import org.jboss.netty.util.Timer; import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler; @@ -38,4 +39,6 @@ public interface PinpointServerConfig { ServerStreamChannelMessageListener getStreamMessageListener(); + ClusterOption getClusterOption(); + } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/util/StringUtils.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/util/StringUtils.java new file mode 100644 index 000000000000..fc465e336bba --- /dev/null +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/util/StringUtils.java @@ -0,0 +1,34 @@ +/* + * + * * Copyright 2014 NAVER Corp. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + * + */ + +package com.navercorp.pinpoint.rpc.util; + +/** + * @Author Taejin Koo + */ +public final class StringUtils { + + private StringUtils() { + } + + public static boolean isEmpty(String string) { + return string == null || string.isEmpty(); + } + +}