Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Support cluster's client and cluster's server in web and collector pr… #1021

Merged
merged 1 commit into from
Oct 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.navercorp.pinpoint.rpc.MessageListener;
import com.navercorp.pinpoint.rpc.PinpointSocket;
import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.cluster.Role;
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode;
import com.navercorp.pinpoint.rpc.packet.PingPacket;
Expand Down Expand Up @@ -63,7 +65,9 @@ public CollectorClusterAcceptor(CollectorClusterConnectionOption option, InetSoc
public void start() {
logger.info("{} initialization started.", ClassUtils.simpleClassName(this));

PinpointServerAcceptor serverAcceptor = new PinpointServerAcceptor();
ClusterOption clusterOption = new ClusterOption(true, option.getClusterId(), Role.ROUTER);

PinpointServerAcceptor serverAcceptor = new PinpointServerAcceptor(clusterOption);
serverAcceptor.setMessageListener(new ClusterServerMessageListener(option.getClusterId(), option.getRouteMessageHandler()));
serverAcceptor.setServerStreamChannelMessageListener(option.getRouteStreamMessageHandler());
serverAcceptor.addStateChangeEventHandler(new WebClusterServerChannelStateChangeHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.navercorp.pinpoint.rpc.PinpointSocket;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.cluster.Role;
import com.navercorp.pinpoint.rpc.util.ClassUtils;
import com.navercorp.pinpoint.rpc.util.ClientFactoryUtils;
import org.slf4j.Logger;
Expand All @@ -47,11 +49,14 @@ public CollectorClusterConnector(CollectorClusterConnectionOption option) {
public void start() {
logger.info("{} initialization started.", ClassUtils.simpleClassName(this));

ClusterOption clusterOption = new ClusterOption(true, option.getClusterId(), Role.ROUTER);

this.clientFactory = new PinpointClientFactory();

this.clientFactory.setTimeoutMillis(1000 * 5);
this.clientFactory.setMessageListener(option.getRouteMessageHandler());
this.clientFactory.setServerStreamChannelMessageListener(option.getRouteStreamMessageHandler());
this.clientFactory.setClusterOption(clusterOption);

Map<String, Object> properties = new HashMap<String, Object>();
properties.put("id", option.getClusterId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.rpc.*;
import com.navercorp.pinpoint.rpc.client.ConnectFuture.Result;
import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.common.SocketStateChangeResult;
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
import com.navercorp.pinpoint.rpc.packet.*;
Expand Down Expand Up @@ -88,6 +89,9 @@ public class DefaultPinpointClientHandler extends SimpleChannelHandler implement
private final ConnectFuture connectFuture = new ConnectFuture();

private final String objectUniqName;

private final ClusterOption localClusterOption;
private ClusterOption remoteClusterOption = ClusterOption.DISABLE_CLUSTER_OPTION;

public DefaultPinpointClientHandler(PinpointClientFactory clientFactory) {
this(clientFactory, DEFAULT_PING_DELAY, DEFAULT_ENABLE_WORKER_PACKET_DELAY, DEFAULT_TIMEOUTMILLIS);
Expand Down Expand Up @@ -116,6 +120,8 @@ public DefaultPinpointClientHandler(PinpointClientFactory clientFactory, long pi
this.socketId = clientFactory.issueNewSocketId();
this.pingIdGenerator = new AtomicInteger(0);
this.state = new PinpointClientHandlerState(this, clientFactory.getStateChangeEventListeners());

this.localClusterOption = clientFactory.getClusterOption();
}

public void setPinpointClient(PinpointClient pinpointClient) {
Expand Down Expand Up @@ -167,7 +173,11 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
Map<String, Object> handshakeData = new HashMap<String, Object>();
handshakeData.putAll(clientFactory.getProperties());
handshakeData.put("socketId", socketId);


if (localClusterOption.isEnable()) {
handshakeData.put("cluster", localClusterOption.getProperties());
}

handshaker.handshakeStart(channel, handshakeData);

connectFuture.setResult(Result.SUCCESS);
Expand Down Expand Up @@ -419,6 +429,7 @@ private void handleHandshakePacket(ControlHandshakeResponsePacket message, Chann
if (code == HandshakeResponseCode.SUCCESS || code == HandshakeResponseCode.ALREADY_KNOWN) {
state.toRunSimplex();
} else if (code == HandshakeResponseCode.DUPLEX_COMMUNICATION || code == HandshakeResponseCode.ALREADY_DUPLEX_COMMUNICATION) {
remoteClusterOption = handshaker.getClusterOption();
state.toRunDuplex();
} else if (code == HandshakeResponseCode.SIMPLEX_COMMUNICATION || code == HandshakeResponseCode.ALREADY_SIMPLEX_COMMUNICATION) {
state.toRunSimplex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import com.navercorp.pinpoint.rpc.MessageListener;
import com.navercorp.pinpoint.rpc.StateChangeEventListener;
import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.cluster.Role;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -81,7 +83,9 @@ public class PinpointClientFactory {
private long pingDelay = DEFAULT_PING_DELAY;
private long enableWorkerPacketDelay = DEFAULT_ENABLE_WORKER_PACKET_DELAY;
private long timeoutMillis = DEFAULT_TIMEOUTMILLIS;


private ClusterOption clusterOption = ClusterOption.DISABLE_CLUSTER_OPTION;

private MessageListener messageListener = SimpleLoggingMessageListener.LISTENER;
private List<StateChangeEventListener> stateChangeEventListeners = new ArrayList<StateChangeEventListener>();
private ServerStreamChannelMessageListener serverStreamChannelMessageListener = DisabledServerStreamChannelMessageListener.INSTANCE;
Expand Down Expand Up @@ -407,6 +411,18 @@ public void setProperties(Map<String, Object> agentProperties) {
this.properties = Collections.unmodifiableMap(agentProperties);
}

public ClusterOption getClusterOption() {
return clusterOption;
}

public void setClusterOption(String id, List<Role> roles) {
this.clusterOption = new ClusterOption(true, id, roles);
}

public void setClusterOption(ClusterOption clusterOption) {
this.clusterOption = clusterOption;
}

public MessageListener getMessageListener() {
return messageListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@

package com.navercorp.pinpoint.rpc.client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.cluster.Role;
import com.navercorp.pinpoint.rpc.util.*;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
Expand All @@ -34,10 +40,6 @@
import com.navercorp.pinpoint.rpc.packet.ControlHandshakePacket;
import com.navercorp.pinpoint.rpc.packet.ControlHandshakeResponsePacket;
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode;
import com.navercorp.pinpoint.rpc.util.AssertUtils;
import com.navercorp.pinpoint.rpc.util.ClassUtils;
import com.navercorp.pinpoint.rpc.util.ControlMessageEncodingUtils;
import com.navercorp.pinpoint.rpc.util.MapUtils;

public class PinpointClientHandshaker {

Expand All @@ -59,6 +61,7 @@ public class PinpointClientHandshaker {

private final Object lock = new Object();
private final AtomicReference<HandshakeResponseCode> handshakeResult = new AtomicReference<HandshakeResponseCode>(null);
private final AtomicReference<ClusterOption> clusterOption = new AtomicReference<ClusterOption>(null);

private String simpleName;

Expand Down Expand Up @@ -153,38 +156,85 @@ public boolean handshakeComplete(ControlHandshakeResponsePacket message) {
this.state.set(STATE_FINISHED);
return false;
}

HandshakeResponseCode code = getHandshakeResponseCode(message);

Map handshakeResponse = decode(message);

HandshakeResponseCode code = getResponseCode(handshakeResponse);
handshakeResult.compareAndSet(null, code);

ClusterOption clusterOption = getClusterOption(handshakeResponse);
this.clusterOption.compareAndSet(null, clusterOption);

logger.info("{} handshakeComplete method completed. handshakeResult:{} / {}", simpleClassNameAndHashCodeString(), code, handshakeResult.get());
return true;
}
}

private HandshakeResponseCode getHandshakeResponseCode(ControlHandshakeResponsePacket message) {
private Map decode(ControlHandshakeResponsePacket message) {
byte[] payload = message.getPayload();
if (payload == null) {
return HandshakeResponseCode.PROTOCOL_ERROR;
return Collections.EMPTY_MAP;
}

try {
Map result = (Map) ControlMessageEncodingUtils.decode(payload);

int code = MapUtils.getInteger(result, ControlHandshakeResponsePacket.CODE, -1);
int subCode = MapUtils.getInteger(result, ControlHandshakeResponsePacket.SUB_CODE, -1);

return HandshakeResponseCode.getValue(code, subCode);
return result;
} catch (ProtocolException e) {
logger.warn(e.getMessage(), e);

}
return HandshakeResponseCode.UNKNOWN_CODE;

return Collections.EMPTY_MAP;
}


private HandshakeResponseCode getResponseCode(Map handshakeResponse) {
if (handshakeResponse == Collections.EMPTY_MAP) {
return HandshakeResponseCode.PROTOCOL_ERROR;
}

int code = MapUtils.getInteger(handshakeResponse, ControlHandshakeResponsePacket.CODE, -1);
int subCode = MapUtils.getInteger(handshakeResponse, ControlHandshakeResponsePacket.SUB_CODE, -1);

return HandshakeResponseCode.getValue(code, subCode);
}

private ClusterOption getClusterOption(Map handshakeResponse) {
if (handshakeResponse == Collections.EMPTY_MAP) {
return ClusterOption.DISABLE_CLUSTER_OPTION;
}

Map cluster = (Map) handshakeResponse.get(ControlHandshakeResponsePacket.CLUSTER);
if (cluster == null) {
return ClusterOption.DISABLE_CLUSTER_OPTION;
}

String id = MapUtils.getString(cluster, "id", "");
List<Role> roles = getRoles((List) cluster.get("roles"));

if (StringUtils.isEmpty(id)) {
return ClusterOption.DISABLE_CLUSTER_OPTION;
} else {
return new ClusterOption(true, id, roles);
}
}

private List<Role> getRoles(List roleNames) {
List<Role> roles = new ArrayList<Role>();
for (Object roleName : roleNames) {
if (roleName instanceof String && !StringUtils.isEmpty((String) roleName)) {
roles.add(Role.getValue((String) roleName));
}
}
return roles;
}

public HandshakeResponseCode getHandshakeResult() {
return handshakeResult.get();
}

public ClusterOption getClusterOption() {
return clusterOption.get();
}

public void handshakeAbort() {
logger.info("{} handshakeAbort method started.", simpleClassNameAndHashCodeString());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
*
* * Copyright 2014 NAVER Corp.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*
*/

package com.navercorp.pinpoint.rpc.cluster;

import java.util.*;

/**
* @Author Taejin Koo
*/
public class ClusterOption {

public static final ClusterOption DISABLE_CLUSTER_OPTION = new ClusterOption(false, "", Collections.EMPTY_LIST);

private final boolean enable;
private final String id;
private final List<Role> roles;

public ClusterOption(boolean enable, String id, String role) {
this(enable, id, Role.getValue(role));
}

public ClusterOption(boolean enable, String id, Role role) {
this(enable, id, Arrays.asList(role));
}

public ClusterOption(boolean enable, String id, List<Role> roles) {
this.enable = enable;
this.id = id;
this.roles = roles;
}

public boolean isEnable() {
return enable;
}

public String getId() {
return id;
}

public List<Role> getRoles() {
return roles;
}

public Map<String, Object> getProperties() {
if (!enable) {
return Collections.emptyMap();
}

Map<String, Object> clusterProperties = new HashMap<String, Object>(2);
clusterProperties.put("id", id);

List<String> roleList = new ArrayList<String>(roles.size());
for (Role role : roles) {
roleList.add(role.name());
}
clusterProperties.put("roles", roleList);

return clusterProperties;
}

}
43 changes: 43 additions & 0 deletions rpc/src/main/java/com/navercorp/pinpoint/rpc/cluster/Role.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
*
* * Copyright 2014 NAVER Corp.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*
*/

package com.navercorp.pinpoint.rpc.cluster;

/**
* @Author Taejin Koo
*/
public enum Role {

CALLER, CALLEE, ROUTER, UNKNOWN;

public static Role getValue(String name) {
if (name == null) {
return UNKNOWN;
}

for (Role role : Role.values()) {
if (name.equals(role.name())) {
return role;
}
}

return UNKNOWN;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public static boolean isClosed(SocketStateCode code) {
case UNEXPECTED_CLOSE_BY_SERVER:
case ERROR_UNKNOWN:
case ERROR_ILLEGAL_STATE_CHANGE:
case ERROR_SYNC_STATE_SESSION:
return true;
default:
return false;
Expand Down
Loading