Skip to content

Commit

Permalink
Implement realtime span delivery service. pinpoint-apm#560
Browse files Browse the repository at this point in the history
supported  openStream api for AgentService class and PinpointSocket class.
  • Loading branch information
koo-taejin committed Oct 7, 2015
1 parent eec0eb1 commit 1837921
Show file tree
Hide file tree
Showing 22 changed files with 150 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {

private ClientStreamChannelContext createStreamChannel(PinpointServerClusterPoint clusterPoint, byte[] payload, ClientStreamChannelMessageListener messageListener) {
PinpointServer pinpointServer = clusterPoint.getPinpointServer();
return pinpointServer.createStream(payload, messageListener);
return pinpointServer.openStream(payload, messageListener);
}

public void close(ServerStreamChannelContext consumerContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelMessageListener;

import java.net.SocketAddress;

Expand All @@ -33,6 +35,8 @@ public interface PinpointSocket {
void response(RequestPacket requestPacket, byte[] payload);
void response(int requestId, byte[] payload);

ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener);

SocketAddress getRemoteAddress();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ public Future<ResponseMessage> request(byte[] bytes) {
}

@Override
public ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
ensureOpen();

PinpointClientHandlerContext context = getChannelContext(channel);
return context.createStream(payload, clientStreamChannelMessageListener);
return context.openStream(payload, clientStreamChannelMessageListener);
}

@Override
Expand Down Expand Up @@ -502,27 +502,28 @@ public void close() {
private void closeChannel() {
Channel channel = this.channel;
if (channel != null) {
closeStreamChannelManager(channel);
sendClosedPacket(channel);

ChannelFuture closeFuture = channel.close();
closeFuture.addListener(new WriteFailFutureListener(logger, "close() event failed.", "close() event success."));
closeFuture.awaitUninterruptibly();
}
}

// Calling this method on a closed PinpointClientHandler has no effect.
private void closeResources() {
logger.debug("{} closeResources() started.", objectUniqName);

Channel channel = this.channel;
closeStreamChannelManager(channel);
this.handshaker.handshakeAbort();
this.requestManager.close();
this.channelTimer.stop();
}

private void closeStreamChannelManager(Channel channel) {
if (!channel.isConnected()) {
logger.debug("channel already closed. skip closeStreamChannelManager() {}", channel);
if (channel == null) {
logger.debug("channel already set null. skip closeStreamChannelManager() {}", channel);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ public void response(int requestId, byte[] payload) {
pinpointClientHandler.response(requestId, payload);
}

@Override
public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
// StreamChannel must be changed into interface in order to throw the StreamChannel that returns failure.
// fow now throw just exception
ensureOpen();
return pinpointClientHandler.openStream(payload, clientStreamChannelMessageListener);
}

@Override
public SocketAddress getRemoteAddress() {
return pinpointClientHandler.getRemoteAddress();
Expand All @@ -150,13 +158,6 @@ public ClusterOption getRemoteClusterOption() {
return pinpointClientHandler.getRemoteClusterOption();
}

public ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
// StreamChannel must be changed into interface in order to throw the StreamChannel that returns failure.
// fow now throw just exception
ensureOpen();
return pinpointClientHandler.createStreamChannel(payload, clientStreamChannelMessageListener);
}

public StreamChannelContext findStreamChannel(int streamChannelId) {

ensureOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface PinpointClientHandler {

void response(int requestId, byte[] payload);

ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener);
ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener);

StreamChannelContext findStreamChannel(int streamChannelId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public Channel getChannel() {
return channel;
}

public ClientStreamChannelContext createStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
return streamChannelManager.openStreamChannel(payload, clientStreamChannelMessageListener);
public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
return streamChannelManager.openStream(payload, clientStreamChannelMessageListener);
}

public void handleStreamEvent(StreamPacket message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.navercorp.pinpoint.rpc.PinpointSocket;
import com.navercorp.pinpoint.rpc.StateChangeEventListener;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,7 +117,7 @@ private SocketStateChangeResult to(SocketStateCode nextState) {

logger.debug("{} stateTo() started. to:{}", objectName, nextState);

SocketStateChangeResult stateChangeResult = state.changeState(nextState);
SocketStateChangeResult stateChangeResult = state.to(nextState);
if (stateChangeResult.isChange()) {
executeChangeEventHandler(pinpointSocket, nextState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void response(int requestId, byte[] payload) {
}

@Override
public ClientStreamChannelContext createStreamChannel(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class SocketState {
private SocketStateCode beforeState = SocketStateCode.NONE;
private SocketStateCode currentState = SocketStateCode.NONE;

public synchronized SocketStateChangeResult changeState(SocketStateCode nextState) {
public synchronized SocketStateChangeResult to(SocketStateCode nextState) {
boolean enable = this.currentState.canChangeState(nextState);
if (enable) {
this.beforeState = this.currentState;
Expand All @@ -35,79 +35,79 @@ public synchronized SocketStateChangeResult changeState(SocketStateCode nextStat
return new SocketStateChangeResult(false, beforeState, currentState, nextState);
}

public SocketStateChangeResult stateToBeingConnect() {
public SocketStateChangeResult toBeingConnect() {
SocketStateCode nextState = SocketStateCode.BEING_CONNECT;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToConnected() {
public SocketStateChangeResult toConnected() {
SocketStateCode nextState = SocketStateCode.CONNECTED;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToConnectFailed() {
public SocketStateChangeResult toConnectFailed() {
SocketStateCode nextState = SocketStateCode.CONNECT_FAILED;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToIgnore() {
public SocketStateChangeResult toIgnore() {
SocketStateCode nextState = SocketStateCode.IGNORE;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToRunWithoutHandshake() {
public SocketStateChangeResult toRunWithoutHandshake() {
SocketStateCode nextState = SocketStateCode.RUN_WITHOUT_HANDSHAKE;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToRunSimplex() {
public SocketStateChangeResult toRunSimplex() {
SocketStateCode nextState = SocketStateCode.RUN_SIMPLEX;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToRunDuplex() {
public SocketStateChangeResult toRunDuplex() {
SocketStateCode nextState = SocketStateCode.RUN_DUPLEX;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToBeingCloseByClient() {
public SocketStateChangeResult toBeingCloseByClient() {
SocketStateCode nextState = SocketStateCode.BEING_CLOSE_BY_CLIENT;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToClosedByClient() {
public SocketStateChangeResult toClosedByClient() {
SocketStateCode nextState = SocketStateCode.CLOSED_BY_CLIENT;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToUnexpectedCloseByClient() {
public SocketStateChangeResult toUnexpectedCloseByClient() {
SocketStateCode nextState = SocketStateCode.UNEXPECTED_CLOSE_BY_CLIENT;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToBeingCloseByServer() {
public SocketStateChangeResult toBeingCloseByServer() {
SocketStateCode nextState = SocketStateCode.BEING_CLOSE_BY_SERVER;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToClosedByServer() {
public SocketStateChangeResult toClosedByServer() {
SocketStateCode nextState = SocketStateCode.CLOSED_BY_SERVER;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToUnexpectedCloseByServer() {
public SocketStateChangeResult toUnexpectedCloseByServer() {
SocketStateCode nextState = SocketStateCode.UNEXPECTED_CLOSE_BY_SERVER;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToUnknownError() {
public SocketStateChangeResult toUnknownError() {
SocketStateCode nextState = SocketStateCode.ERROR_UNKNOWN;
return changeState(nextState);
return to(nextState);
}

public SocketStateChangeResult stateToSyncStateSessionError() {
public SocketStateChangeResult toSyncStateSessionError() {
SocketStateCode nextState = SocketStateCode.ERROR_SYNC_STATE_SESSION;
return changeState(nextState);
return to(nextState);
}

public synchronized SocketStateCode getCurrentState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,29 @@ public void stop() {
}

public void stop(boolean serverStop) {
SocketStateCode currentStateCode = getCurrentStateCode();
if (SocketStateCode.BEING_CLOSE_BY_SERVER == currentStateCode) {
state.toClosed();
} else if (SocketStateCode.BEING_CLOSE_BY_CLIENT == currentStateCode) {
state.toClosedByPeer();
} else if (SocketStateCode.isRun(currentStateCode) && serverStop) {
state.toUnexpectedClosed();
} else if (SocketStateCode.isRun(currentStateCode)) {
state.toUnexpectedClosedByPeer();
} else if (SocketStateCode.isClosed(currentStateCode)) {
logger.warn("{} stop(). Socket has closed state({}).", objectUniqName, currentStateCode);
} else {
state.toErrorUnknown();
logger.warn("{} stop(). Socket has unexpected state.", objectUniqName, currentStateCode);
}

if (this.channel.isConnected()) {
channel.close();
try {
SocketStateCode currentStateCode = getCurrentStateCode();
if (SocketStateCode.BEING_CLOSE_BY_SERVER == currentStateCode) {
state.toClosed();
} else if (SocketStateCode.BEING_CLOSE_BY_CLIENT == currentStateCode) {
state.toClosedByPeer();
} else if (SocketStateCode.isRun(currentStateCode) && serverStop) {
state.toUnexpectedClosed();
} else if (SocketStateCode.isRun(currentStateCode)) {
state.toUnexpectedClosedByPeer();
} else if (SocketStateCode.isClosed(currentStateCode)) {
logger.warn("{} stop(). Socket has closed state({}).", objectUniqName, currentStateCode);
} else {
state.toErrorUnknown();
logger.warn("{} stop(). Socket has unexpected state.", objectUniqName, currentStateCode);
}

if (this.channel.isConnected()) {
channel.close();
}
} finally {
streamChannelManager.close();
}

streamChannelManager.close();
}

@Override
Expand Down Expand Up @@ -215,10 +217,10 @@ public StreamChannelContext getStreamChannel(int channelId) {
}

@Override
public ClientStreamChannelContext createStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener) {
logger.info("{} createStream() started.", objectUniqName);

ClientStreamChannelContext streamChannel = streamChannelManager.openStreamChannel(payload, clientStreamChannelMessageListener);
ClientStreamChannelContext streamChannel = streamChannelManager.openStream(payload, clientStreamChannelMessageListener);

logger.info("{} createStream() completed.", objectUniqName);
return streamChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private SocketStateChangeResult to(SocketStateCode nextState) {

logger.debug("{} stateTo() started. to:{}", objectUniqName, nextState);

SocketStateChangeResult stateChangeResult = state.changeState(nextState);
SocketStateChangeResult stateChangeResult = state.to(nextState);
if (stateChangeResult.isChange()) {
executeChangeEventHandler(pinpointServer, nextState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
*/
public interface PinpointServer extends PinpointSocket {

ClientStreamChannelContext createStream(byte[] payload, ClientStreamChannelMessageListener clientStreamChannelMessageListener);

void messageReceived(Object message);

SocketStateCode getCurrentStateCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,13 @@ public class LoggingStreamChannelStateChangeEventHandler implements StreamChanne
private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode) throws Exception {
logger.info(createMessage(true, streamChannel, oldStateCode, updatedStateCode));
public void eventPerformed(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode) throws Exception {
logger.info("eventPerformed streamChannel:{}, stateCode:{}", streamChannel, updatedStateCode);
}

@Override
public void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode oldStateCode, StreamChannelStateCode updatedStateCode, Throwable e) {
logger.info("{} message={}", createMessage(false, streamChannel, oldStateCode, updatedStateCode), e.getMessage(), e);
}

private String createMessage(boolean isSuccess, StreamChannel streamChannel, StreamChannelStateCode oldState, StreamChannelStateCode updatedState) {
StringBuilder message = new StringBuilder(32);
message.append("Change state to ");
message.append(oldState).append("->").append(updatedState);
message.append("(").append(isSuccess ? "SUCCESS" : "FAIL").append(")");
message.append(" [Channel:").append(streamChannel).append("] ");
return message.toString();
public void exceptionCaught(StreamChannel streamChannel, StreamChannelStateCode updatedStateCode, Throwable e) {
logger.warn("exceptionCaught message:{}, streamChannel:{}, stateCode:{}", e.getMessage(), streamChannel, updatedStateCode, e);
}

}
Loading

0 comments on commit 1837921

Please # to comment.