diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/StreamRouteHandler.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/StreamRouteHandler.java index 1e1e20722839..98e52cac00af 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/StreamRouteHandler.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/StreamRouteHandler.java @@ -126,7 +126,7 @@ private TCommandTransferResponse onRoute0(StreamEvent event) { private ClientStreamChannelContext createStreamChannel(PinpointServerClusterPoint clusterPoint, byte[] payload, ClientStreamChannelMessageListener messageListener) { PinpointServer pinpointServer = clusterPoint.getPinpointServer(); - return pinpointServer.createStream(payload, messageListener); + return pinpointServer.openStream(payload, messageListener); } public void close(ServerStreamChannelContext consumerContext) { diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/PinpointSocket.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/PinpointSocket.java index 89a250da6af1..f8fb152a0c09 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/PinpointSocket.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/PinpointSocket.java @@ -18,6 +18,8 @@ import com.navercorp.pinpoint.rpc.cluster.ClusterOption; import com.navercorp.pinpoint.rpc.packet.RequestPacket; +import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelContext; +import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelMessageListener; import java.net.SocketAddress; @@ -33,6 +35,8 @@ public interface PinpointSocket { void response(RequestPacket requestPacket, byte[] payload); void response(int requestId, byte[] payload); + ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener); + SocketAddress getRemoteAddress(); void close(); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java index d571303d1da1..4f0d8994de3f 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java @@ -356,11 +356,11 @@ public Future request(byte[] bytes) { } @Override - public ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { + public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { ensureOpen(); PinpointClientHandlerContext context = getChannelContext(channel); - return context.createStream(payload, clientStreamChannelMessageListener); + return context.openStream(payload, clientStreamChannelMessageListener); } @Override @@ -502,27 +502,28 @@ public void close() { private void closeChannel() { Channel channel = this.channel; if (channel != null) { - closeStreamChannelManager(channel); sendClosedPacket(channel); - + ChannelFuture closeFuture = channel.close(); closeFuture.addListener(new WriteFailFutureListener(logger, "close() event failed.", "close() event success.")); closeFuture.awaitUninterruptibly(); } } - + // Calling this method on a closed PinpointClientHandler has no effect. private void closeResources() { logger.debug("{} closeResources() started.", objectUniqName); + Channel channel = this.channel; + closeStreamChannelManager(channel); this.handshaker.handshakeAbort(); this.requestManager.close(); this.channelTimer.stop(); } private void closeStreamChannelManager(Channel channel) { - if (!channel.isConnected()) { - logger.debug("channel already closed. skip closeStreamChannelManager() {}", channel); + if (channel == null) { + logger.debug("channel already set null. skip closeStreamChannelManager() {}", channel); return; } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClient.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClient.java index ad5f4251a48c..36c0722014bb 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClient.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClient.java @@ -135,6 +135,14 @@ public void response(int requestId, byte[] payload) { pinpointClientHandler.response(requestId, payload); } + @Override + public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { + // StreamChannel must be changed into interface in order to throw the StreamChannel that returns failure. + // fow now throw just exception + ensureOpen(); + return pinpointClientHandler.openStream(payload, clientStreamChannelMessageListener); + } + @Override public SocketAddress getRemoteAddress() { return pinpointClientHandler.getRemoteAddress(); @@ -150,13 +158,6 @@ public ClusterOption getRemoteClusterOption() { return pinpointClientHandler.getRemoteClusterOption(); } - public ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { - // StreamChannel must be changed into interface in order to throw the StreamChannel that returns failure. - // fow now throw just exception - ensureOpen(); - return pinpointClientHandler.createStreamChannel(payload, clientStreamChannelMessageListener); - } - public StreamChannelContext findStreamChannel(int streamChannelId) { ensureOpen(); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandler.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandler.java index 699f23603832..2d201e0bf851 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandler.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandler.java @@ -52,7 +52,7 @@ public interface PinpointClientHandler { void response(int requestId, byte[] payload); - ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener); + ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener); StreamChannelContext findStreamChannel(int streamChannelId); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerContext.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerContext.java index 1b4ce81d720e..8c3722213fa6 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerContext.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerContext.java @@ -46,8 +46,8 @@ public Channel getChannel() { return channel; } - public ClientStreamChannelContext createStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { - return streamChannelManager.openStreamChannel(payload, clientStreamChannelMessageListener); + public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { + return streamChannelManager.openStream(payload, clientStreamChannelMessageListener); } public void handleStreamEvent(StreamPacket message) { diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerState.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerState.java index 5e41415cc05f..49a52aa52f41 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerState.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientHandlerState.java @@ -18,7 +18,6 @@ import com.navercorp.pinpoint.rpc.PinpointSocket; import com.navercorp.pinpoint.rpc.StateChangeEventListener; -import com.navercorp.pinpoint.rpc.server.PinpointServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +117,7 @@ private SocketStateChangeResult to(SocketStateCode nextState) { logger.debug("{} stateTo() started. to:{}", objectName, nextState); - SocketStateChangeResult stateChangeResult = state.changeState(nextState); + SocketStateChangeResult stateChangeResult = state.to(nextState); if (stateChangeResult.isChange()) { executeChangeEventHandler(pinpointSocket, nextState); } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/ReconnectStateClientHandler.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/ReconnectStateClientHandler.java index eeaeb0224f8f..fa9756ec3001 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/ReconnectStateClientHandler.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/ReconnectStateClientHandler.java @@ -100,7 +100,7 @@ public void response(int requestId, byte[] payload) { } @Override - public ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { + public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { throw new UnsupportedOperationException(); } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketState.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketState.java index eae05fc72232..a8b508a4f996 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketState.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/common/SocketState.java @@ -24,7 +24,7 @@ public class SocketState { private SocketStateCode beforeState = SocketStateCode.NONE; private SocketStateCode currentState = SocketStateCode.NONE; - public synchronized SocketStateChangeResult changeState(SocketStateCode nextState) { + public synchronized SocketStateChangeResult to(SocketStateCode nextState) { boolean enable = this.currentState.canChangeState(nextState); if (enable) { this.beforeState = this.currentState; @@ -35,79 +35,79 @@ public synchronized SocketStateChangeResult changeState(SocketStateCode nextStat return new SocketStateChangeResult(false, beforeState, currentState, nextState); } - public SocketStateChangeResult stateToBeingConnect() { + public SocketStateChangeResult toBeingConnect() { SocketStateCode nextState = SocketStateCode.BEING_CONNECT; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToConnected() { + public SocketStateChangeResult toConnected() { SocketStateCode nextState = SocketStateCode.CONNECTED; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToConnectFailed() { + public SocketStateChangeResult toConnectFailed() { SocketStateCode nextState = SocketStateCode.CONNECT_FAILED; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToIgnore() { + public SocketStateChangeResult toIgnore() { SocketStateCode nextState = SocketStateCode.IGNORE; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToRunWithoutHandshake() { + public SocketStateChangeResult toRunWithoutHandshake() { SocketStateCode nextState = SocketStateCode.RUN_WITHOUT_HANDSHAKE; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToRunSimplex() { + public SocketStateChangeResult toRunSimplex() { SocketStateCode nextState = SocketStateCode.RUN_SIMPLEX; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToRunDuplex() { + public SocketStateChangeResult toRunDuplex() { SocketStateCode nextState = SocketStateCode.RUN_DUPLEX; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToBeingCloseByClient() { + public SocketStateChangeResult toBeingCloseByClient() { SocketStateCode nextState = SocketStateCode.BEING_CLOSE_BY_CLIENT; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToClosedByClient() { + public SocketStateChangeResult toClosedByClient() { SocketStateCode nextState = SocketStateCode.CLOSED_BY_CLIENT; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToUnexpectedCloseByClient() { + public SocketStateChangeResult toUnexpectedCloseByClient() { SocketStateCode nextState = SocketStateCode.UNEXPECTED_CLOSE_BY_CLIENT; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToBeingCloseByServer() { + public SocketStateChangeResult toBeingCloseByServer() { SocketStateCode nextState = SocketStateCode.BEING_CLOSE_BY_SERVER; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToClosedByServer() { + public SocketStateChangeResult toClosedByServer() { SocketStateCode nextState = SocketStateCode.CLOSED_BY_SERVER; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToUnexpectedCloseByServer() { + public SocketStateChangeResult toUnexpectedCloseByServer() { SocketStateCode nextState = SocketStateCode.UNEXPECTED_CLOSE_BY_SERVER; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToUnknownError() { + public SocketStateChangeResult toUnknownError() { SocketStateCode nextState = SocketStateCode.ERROR_UNKNOWN; - return changeState(nextState); + return to(nextState); } - public SocketStateChangeResult stateToSyncStateSessionError() { + public SocketStateChangeResult toSyncStateSessionError() { SocketStateCode nextState = SocketStateCode.ERROR_SYNC_STATE_SESSION; - return changeState(nextState); + return to(nextState); } public synchronized SocketStateCode getCurrentState() { diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java index a7fea40d10bd..90c72615460f 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java @@ -135,27 +135,29 @@ public void stop() { } public void stop(boolean serverStop) { - SocketStateCode currentStateCode = getCurrentStateCode(); - if (SocketStateCode.BEING_CLOSE_BY_SERVER == currentStateCode) { - state.toClosed(); - } else if (SocketStateCode.BEING_CLOSE_BY_CLIENT == currentStateCode) { - state.toClosedByPeer(); - } else if (SocketStateCode.isRun(currentStateCode) && serverStop) { - state.toUnexpectedClosed(); - } else if (SocketStateCode.isRun(currentStateCode)) { - state.toUnexpectedClosedByPeer(); - } else if (SocketStateCode.isClosed(currentStateCode)) { - logger.warn("{} stop(). Socket has closed state({}).", objectUniqName, currentStateCode); - } else { - state.toErrorUnknown(); - logger.warn("{} stop(). Socket has unexpected state.", objectUniqName, currentStateCode); - } - - if (this.channel.isConnected()) { - channel.close(); + try { + SocketStateCode currentStateCode = getCurrentStateCode(); + if (SocketStateCode.BEING_CLOSE_BY_SERVER == currentStateCode) { + state.toClosed(); + } else if (SocketStateCode.BEING_CLOSE_BY_CLIENT == currentStateCode) { + state.toClosedByPeer(); + } else if (SocketStateCode.isRun(currentStateCode) && serverStop) { + state.toUnexpectedClosed(); + } else if (SocketStateCode.isRun(currentStateCode)) { + state.toUnexpectedClosedByPeer(); + } else if (SocketStateCode.isClosed(currentStateCode)) { + logger.warn("{} stop(). Socket has closed state({}).", objectUniqName, currentStateCode); + } else { + state.toErrorUnknown(); + logger.warn("{} stop(). Socket has unexpected state.", objectUniqName, currentStateCode); + } + + if (this.channel.isConnected()) { + channel.close(); + } + } finally { + streamChannelManager.close(); } - - streamChannelManager.close(); } @Override @@ -215,10 +217,10 @@ public StreamChannelContext getStreamChannel(int channelId) { } @Override - public ClientStreamChannelContext createStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { + public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { logger.info("{} createStream() started.", objectUniqName); - ClientStreamChannelContext streamChannel = streamChannelManager.openStreamChannel(payload, clientStreamChannelMessageListener); + ClientStreamChannelContext streamChannel = streamChannelManager.openStream(payload, clientStreamChannelMessageListener); logger.info("{} createStream() completed.", objectUniqName); return streamChannel; diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServerState.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServerState.java index 43a5b3ef828a..e2865ab2e3b1 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServerState.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServerState.java @@ -109,7 +109,7 @@ private SocketStateChangeResult to(SocketStateCode nextState) { logger.debug("{} stateTo() started. to:{}", objectUniqName, nextState); - SocketStateChangeResult stateChangeResult = state.changeState(nextState); + SocketStateChangeResult stateChangeResult = state.to(nextState); if (stateChangeResult.isChange()) { executeChangeEventHandler(pinpointServer, nextState); } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServer.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServer.java index b56a45585eb2..97ae33a9008c 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServer.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/server/PinpointServer.java @@ -32,8 +32,6 @@ */ public interface PinpointServer extends PinpointSocket { - ClientStreamChannelContext createStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener); - void messageReceived(Object message); SocketStateCode getCurrentStateCode(); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/LoggingStreamChannelStateChangeEventHandler.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/LoggingStreamChannelStateChangeEventHandler.java index 3ffac1ebc101..176097b4867c 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/LoggingStreamChannelStateChangeEventHandler.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/LoggingStreamChannelStateChangeEventHandler.java @@ -27,22 +27,13 @@ public class LoggingStreamChannelStateChangeEventHandler implements StreamChanne private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Override - public void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode) throws Exception { - logger.info(createMessage(true, streamChannel, oldStateCode, updatedStateCode)); + public void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode) throws Exception { + logger.info("eventPerformed streamChannel:{}, stateCode:{}", streamChannel, updatedStateCode); } @Override - public void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode, Throwable e) { - logger.info("{} message={}", createMessage(false, streamChannel, oldStateCode, updatedStateCode), e.getMessage(), e); - } - - private String createMessage(boolean isSuccess, StreamChannel streamChannel, StreamChannelStateCode oldState, StreamChannelStateCode updatedState) { - StringBuilder message = new StringBuilder(32); - message.append("Change state to "); - message.append(oldState).append("->").append(updatedState); - message.append("(").append(isSuccess ? "SUCCESS" : "FAIL").append(")"); - message.append(" [Channel:").append(streamChannel).append("] "); - return message.toString(); + public void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode, Throwable e) { + logger.warn("exceptionCaught message:{}, streamChannel:{}, stateCode:{}", e.getMessage(), streamChannel, updatedStateCode, e); } } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannel.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannel.java index 76a243d91298..9243a4736f0d 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannel.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannel.java @@ -167,7 +167,7 @@ boolean checkState(StreamChannelStateCode currentCode, StreamChannelStateCode ex protected boolean changeStateTo(StreamChannelStateCode nextState) { StreamChannelStateCode currentState = getCurrentState(); - boolean isChanged = state.changeStateTo(currentState, nextState); + boolean isChanged = state.to(currentState, nextState); if (!isChanged && (getCurrentState() != StreamChannelStateCode.ILLEGAL_STATE)) { changeStateTo(StreamChannelStateCode.ILLEGAL_STATE); } @@ -175,9 +175,9 @@ protected boolean changeStateTo(StreamChannelStateCode nextState) { if (isChanged) { for (StreamChannelStateChangeEventHandler h : stateChangeEventHandlers) { try { - h.eventPerformed(this, currentState, nextState); + h.eventPerformed(this, nextState); } catch (Exception e) { - h.exceptionCaught(this, currentState, nextState, e); + h.exceptionCaught(this, nextState, e); } } } @@ -204,5 +204,4 @@ public String toString() { return sb.toString(); } - } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManager.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManager.java index f2119e073be3..87dda775b6c7 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManager.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManager.java @@ -76,7 +76,7 @@ public void close() { } - public ClientStreamChannelContext openStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { + public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) { logger.info("Open streamChannel initialization started. Channel:{} ", channel); final int streamChannelId = idGenerator.generate(); @@ -304,8 +304,12 @@ private ChannelFuture sendCreateFail(int streamChannelId, short code) { } private ChannelFuture sendClose(int streamChannelId, short code) { - StreamClosePacket packet = new StreamClosePacket(streamChannelId, code); - return this.channel.write(packet); + if (channel.isConnected()) { + StreamClosePacket packet = new StreamClosePacket(streamChannelId, code); + return this.channel.write(packet); + } else { + return null; + } } private boolean isServerStreamChannelContext(StreamChannelContext context) { diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelState.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelState.java index 37ea44ca24dc..9e0f86c26098 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelState.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelState.java @@ -33,11 +33,11 @@ public StreamChannelStateCode getCurrentState() { return currentStateReference.get(); } - boolean changeStateTo(StreamChannelStateCode nextState) { - return changeStateTo(currentStateReference.get(), nextState); + boolean to(StreamChannelStateCode nextState) { + return to(currentStateReference.get(), nextState); } - boolean changeStateTo(StreamChannelStateCode currentState, StreamChannelStateCode nextState) { + boolean to(StreamChannelStateCode currentState, StreamChannelStateCode nextState) { if (!nextState.canChangeState(currentState)) { return false; } diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateChangeEventHandler.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateChangeEventHandler.java index e7c295546b44..0927605b6a35 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateChangeEventHandler.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateChangeEventHandler.java @@ -24,8 +24,8 @@ */ public interface StreamChannelStateChangeEventHandler { - void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode) throws Exception; + void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode) throws Exception; - void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode, Throwable e); + void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode, Throwable e); } diff --git a/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManagerTest.java b/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManagerTest.java index 7d04ad8423de..708f3e39823c 100644 --- a/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManagerTest.java +++ b/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelManagerTest.java @@ -58,7 +58,7 @@ public void streamSuccessTest1() throws IOException, InterruptedException { RecordedStreamChannelMessageListener clientListener = new RecordedStreamChannelMessageListener(4); - ClientStreamChannelContext clientContext = client.createStreamChannel(new byte[0], clientListener); + ClientStreamChannelContext clientContext = client.openStream(new byte[0], clientListener); int sendCount = 4; @@ -92,10 +92,10 @@ public void streamSuccessTest2() throws IOException, InterruptedException { PinpointClient client = clientFactory.connect("127.0.0.1", bindPort); RecordedStreamChannelMessageListener clientListener = new RecordedStreamChannelMessageListener(4); - ClientStreamChannelContext clientContext = client.createStreamChannel(new byte[0], clientListener); + ClientStreamChannelContext clientContext = client.openStream(new byte[0], clientListener); RecordedStreamChannelMessageListener clientListener2 = new RecordedStreamChannelMessageListener(4); - ClientStreamChannelContext clientContext2 = client.createStreamChannel(new byte[0], clientListener2); + ClientStreamChannelContext clientContext2 = client.openStream(new byte[0], clientListener2); int sendCount = 4; @@ -154,7 +154,7 @@ public void streamSuccessTest3() throws IOException, InterruptedException { RecordedStreamChannelMessageListener clientListener = new RecordedStreamChannelMessageListener(4); if (writableServer instanceof PinpointServer) { - ClientStreamChannelContext clientContext = ((PinpointServer)writableServer).createStream(new byte[0], clientListener); + ClientStreamChannelContext clientContext = ((PinpointServer)writableServer).openStream(new byte[0], clientListener); int sendCount = 4; @@ -189,7 +189,7 @@ public void streamClosedTest1() throws IOException, InterruptedException { RecordedStreamChannelMessageListener clientListener = new RecordedStreamChannelMessageListener(4); - ClientStreamChannelContext clientContext = client.createStreamChannel(new byte[0], clientListener); + ClientStreamChannelContext clientContext = client.openStream(new byte[0], clientListener); Thread.sleep(100); @@ -217,7 +217,7 @@ public void streamClosedTest2() throws IOException, InterruptedException { RecordedStreamChannelMessageListener clientListener = new RecordedStreamChannelMessageListener(4); - ClientStreamChannelContext clientContext = client.createStreamChannel(new byte[0], clientListener); + ClientStreamChannelContext clientContext = client.openStream(new byte[0], clientListener); Thread.sleep(100); Assert.assertEquals(1, bo.getStreamChannelContextSize()); @@ -260,7 +260,7 @@ public void streamClosedTest3() throws IOException, InterruptedException { if (writableServer instanceof PinpointServer) { RecordedStreamChannelMessageListener clientListener = new RecordedStreamChannelMessageListener(4); - ClientStreamChannelContext clientContext = ((PinpointServer)writableServer).createStream(new byte[0], clientListener); + ClientStreamChannelContext clientContext = ((PinpointServer)writableServer).openStream(new byte[0], clientListener); StreamChannelContext aaa = client.findStreamChannel(2); diff --git a/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateTest.java b/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateTest.java index 287e1382e097..0715fffdaa5b 100644 --- a/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateTest.java +++ b/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelStateTest.java @@ -27,16 +27,16 @@ public void functionTest1() { StreamChannelState state = new StreamChannelState(); Assert.assertEquals(StreamChannelStateCode.NEW, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.OPEN); + state.to(StreamChannelStateCode.OPEN); Assert.assertEquals(StreamChannelStateCode.OPEN, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.CONNECT_AWAIT); + state.to(StreamChannelStateCode.CONNECT_AWAIT); Assert.assertEquals(StreamChannelStateCode.CONNECT_AWAIT, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.CONNECTED); + state.to(StreamChannelStateCode.CONNECTED); Assert.assertEquals(StreamChannelStateCode.CONNECTED, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.CLOSED); + state.to(StreamChannelStateCode.CLOSED); Assert.assertEquals(StreamChannelStateCode.CLOSED, state.getCurrentState()); } @@ -45,16 +45,16 @@ public void functionTest2() { StreamChannelState state = new StreamChannelState(); Assert.assertEquals(StreamChannelStateCode.NEW, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.OPEN); + state.to(StreamChannelStateCode.OPEN); Assert.assertEquals(StreamChannelStateCode.OPEN, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.CONNECT_ARRIVED); + state.to(StreamChannelStateCode.CONNECT_ARRIVED); Assert.assertEquals(StreamChannelStateCode.CONNECT_ARRIVED, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.CONNECTED); + state.to(StreamChannelStateCode.CONNECTED); Assert.assertEquals(StreamChannelStateCode.CONNECTED, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.CLOSED); + state.to(StreamChannelStateCode.CLOSED); Assert.assertEquals(StreamChannelStateCode.CLOSED, state.getCurrentState()); } @@ -63,7 +63,7 @@ public void functionTest3() { StreamChannelState state = new StreamChannelState(); Assert.assertEquals(StreamChannelStateCode.NEW, state.getCurrentState()); - boolean result = state.changeStateTo(StreamChannelStateCode.CONNECTED); + boolean result = state.to(StreamChannelStateCode.CONNECTED); Assert.assertFalse(result); } @@ -72,10 +72,10 @@ public void functionTest4() { StreamChannelState state = new StreamChannelState(); Assert.assertEquals(StreamChannelStateCode.NEW, state.getCurrentState()); - state.changeStateTo(StreamChannelStateCode.OPEN); + state.to(StreamChannelStateCode.OPEN); Assert.assertEquals(StreamChannelStateCode.OPEN, state.getCurrentState()); - boolean result = state.changeStateTo(StreamChannelStateCode.CONNECTED); + boolean result = state.to(StreamChannelStateCode.CONNECTED); Assert.assertFalse(result); } diff --git a/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelTest.java b/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelTest.java index 19d5a06fe9ee..18bf47f83298 100644 --- a/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelTest.java +++ b/rpc/src/test/java/com/navercorp/pinpoint/rpc/stream/StreamChannelTest.java @@ -67,13 +67,13 @@ class TestStateChangeHandler implements StreamChannelStateChangeEventHandler { @Override - public void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode) throws Exception { + public void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode) throws Exception { this.latestEventPerformedStateCode = updatedStateCode; this.totalEventPerformedCount++; } @Override - public void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode, Throwable e) { + public void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode, Throwable e) { } public StreamChannelStateCode getLatestEventPerformedStateCode() { diff --git a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentService.java b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentService.java index d833bd8951d8..10ac5bb9ad58 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentService.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentService.java @@ -19,6 +19,8 @@ package com.navercorp.pinpoint.web.service; +import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelContext; +import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelMessageListener; import com.navercorp.pinpoint.web.cluster.PinpointRouteResponse; import com.navercorp.pinpoint.web.vo.AgentActiveThreadCountList; import com.navercorp.pinpoint.web.vo.AgentInfo; @@ -38,16 +40,19 @@ public interface AgentService { AgentInfo getAgentInfo(String applicationName, String agentId, long startTimeStamp, boolean checkDB); List getAgentInfoList(String applicationName); - PinpointRouteResponse invoke(AgentInfo agentInfoList, TBase tBase) throws TException; - PinpointRouteResponse invoke(AgentInfo agentInfoList, TBase tBase, long timeout) throws TException; - PinpointRouteResponse invoke(AgentInfo agentInfoList, byte[] payload) throws TException; - PinpointRouteResponse invoke(AgentInfo agentInfoList, byte[] payload, long timeout) throws TException; + PinpointRouteResponse invoke(AgentInfo agentInfo, TBase tBase) throws TException; + PinpointRouteResponse invoke(AgentInfo agentInfo, TBase tBase, long timeout) throws TException; + PinpointRouteResponse invoke(AgentInfo agentInfo, byte[] payload) throws TException; + PinpointRouteResponse invoke(AgentInfo agentInfo, byte[] payload, long timeout) throws TException; Map invoke(List agentInfoList, TBase tBase) throws TException; Map invoke(List agentInfoList, TBase tBase, long timeout) throws TException; Map invoke(List agentInfoList, byte[] payload) throws TException; Map invoke(List agentInfoList, byte[] payload, long timeout) throws TException; + ClientStreamChannelContext openStream(AgentInfo agentInfo, TBase tBase, ClientStreamChannelMessageListener clientStreamChannelMessageListener) throws TException; + ClientStreamChannelContext openStream(AgentInfo agentInfo, byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) throws TException; + AgentActiveThreadCountList getActiveThreadCount(List agentInfoList) throws TException; AgentActiveThreadCountList getActiveThreadCount(List agentInfoList, byte[] payload) throws TException; diff --git a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentServiceImpl.java b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentServiceImpl.java index 0ba5fd500ddf..3499d4f211a8 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentServiceImpl.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentServiceImpl.java @@ -22,6 +22,8 @@ import com.navercorp.pinpoint.rpc.Future; import com.navercorp.pinpoint.rpc.PinpointSocket; import com.navercorp.pinpoint.rpc.ResponseMessage; +import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelContext; +import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelMessageListener; import com.navercorp.pinpoint.rpc.util.ListUtils; import com.navercorp.pinpoint.thrift.dto.command.TCmdActiveThreadCount; import com.navercorp.pinpoint.thrift.dto.command.TCmdActiveThreadCountRes; @@ -200,6 +202,24 @@ public Map invoke(List agentInfoLis return result; } + @Override + public ClientStreamChannelContext openStream(AgentInfo agentInfo, TBase tBase, ClientStreamChannelMessageListener clientStreamChannelMessageListener) throws TException { + byte[] payload = serialize(tBase); + return openStream(agentInfo, payload, clientStreamChannelMessageListener); + } + + @Override + public ClientStreamChannelContext openStream(AgentInfo agentInfo, byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) throws TException { + TCommandTransfer transferObject = createCommandTransferObject(agentInfo, payload); + PinpointSocket socket = clusterConnectionManager.getSocket(agentInfo); + + if (socket == null) { + return socket.openStream(serialize(transferObject), clientStreamChannelMessageListener); + } + + return null; + } + @Override public AgentActiveThreadCountList getActiveThreadCount(List agentInfoList) throws TException { byte[] activeThread = serialize(new TCmdActiveThreadCount());