Skip to content

Commit

Permalink
Add ActiveMQ java client support (experimental)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xylus committed Apr 20, 2016
1 parent d3e5f52 commit 0317a98
Show file tree
Hide file tree
Showing 21 changed files with 1,268 additions and 0 deletions.
5 changes: 5 additions & 0 deletions plugins/activemq-client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/target/
/.settings/
/.classpath
/.project
/*.iml
31 changes: 31 additions & 0 deletions plugins/activemq-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pom</artifactId>
<relativePath>../..</relativePath>
<version>1.6.0-SNAPSHOT</version>
</parent>

<artifactId>pinpoint-activemq-client-plugin</artifactId>
<name>pinpoint-activemq-client-plugin</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-bootstrap-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2016 Naver Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.activemq.client;

import com.navercorp.pinpoint.common.trace.AnnotationKey;
import com.navercorp.pinpoint.common.trace.AnnotationKeyFactory;
import com.navercorp.pinpoint.common.trace.AnnotationKeyProperty;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.common.trace.ServiceTypeFactory;

import static com.navercorp.pinpoint.common.trace.ServiceTypeProperty.QUEUE;
import static com.navercorp.pinpoint.common.trace.ServiceTypeProperty.RECORD_STATISTICS;

/**
* @author HyunGil Jeong
*/
public final class ActiveMQClientConstants {

private ActiveMQClientConstants() {

}

public static final ServiceType ACTIVEMQ_CLIENT = ServiceTypeFactory.of(8310, "ACTIVEMQ_CLIENT", QUEUE, RECORD_STATISTICS);
public static final ServiceType ACTIVEMQ_CLIENT_INTERNAL = ServiceTypeFactory.of(8311, "ACTIVEMQ_CLIENT_INTERNAL", "ACTIVE_MQ_CLIENT");

public static final AnnotationKey ACTIVEMQ_BROKER_URL = AnnotationKeyFactory.of(101, "activemq.broker.address", AnnotationKeyProperty.VIEW_IN_RECORD_SET);
public static final AnnotationKey ACTIVEMQ_MESSAGE = AnnotationKeyFactory.of(102, "activemq.message", AnnotationKeyProperty.VIEW_IN_RECORD_SET);

public static final String UNKNOWN_ADDRESS = "Unknown";

public static final String ACTIVEMQ_CLIENT_SCOPE = "ActiveMQClientScope";

private static final String PLUGIN_BASE = "com.navercorp.pinpoint.plugin.activemq.client";
private static final String INTERCEPTOR_BASE = PLUGIN_BASE + ".interceptor";

public static final String ACTIVEMQ_TCP_TRANSPORT_FQCN = "org.apache.activemq.transport.tcp.TcpTransport";

public static final String ACTIVEMQ_MESSAGE_DISPATCH_CHANNEL_FIFO_FQCN = "org.apache.activemq.FifoMessageDispatchChannel";
public static final String ACTIVEMQ_MESSAGE_DISPATCH_CHANNEL_SIMPLE_PRIORITY_FQCN = "org.apache.activemq.SimplePriorityMessageDispatchChannel";
public static final String ACTIVEMQ_MESSAGE_DISPATCH_CHANNEL_ENQUEUE_INTERCEPTOR_FQCN = INTERCEPTOR_BASE + ".MessageDispatchChannelEnqueueInterceptor";
public static final String ACTIVEMQ_MESSAGE_DISPATCH_CHANNEL_DEQUEUE_INTERCEPTOR_FQCN = INTERCEPTOR_BASE + ".MessageDispatchChannelDequeueInterceptor";

public static final String ACTIVEMQ_MESSAGE_PRODUCER_FQCN = "org.apache.activemq.ActiveMQMessageProducer";
public static final String ACTIVEMQ_MESSAGE_PRODUCER_SEND_INTERCEPTOR_FQCN = INTERCEPTOR_BASE + ".ActiveMQMessageProducerSendInterceptor";

public static final String ACTIVEMQ_MESSAGE_CONSUMER_FQCN = "org.apache.activemq.ActiveMQMessageConsumer";
public static final String ACTIVEMQ_MESSAGE_CONSUMER_DISPATCH_INTERCEPTOR_FQCN = INTERCEPTOR_BASE + ".ActiveMQMessageConsumerDispatchInterceptor";
public static final String ACTIVEMQ_MESSAGE_CONSUMER_RECEIVE_INTERCEPTOR_FQCN = INTERCEPTOR_BASE + ".ActiveMQMessageConsumerReceiveInterceptor";

// field names
public static final String FIELD_ACTIVEMQ_MESSAGE_PRODUCER_SESSION = "session";
public static final String FIELD_ACTIVEMQ_MESSAGE_CONSUMER_SESSION = "session";
public static final String FIELD_TCP_TRANSPORT_SOCKET = "socket";

private static final String FIELD_BASE = PLUGIN_BASE + ".field";
private static final String FIELD_GETTER_BASE = FIELD_BASE + ".getter";

// field getter FQCN
public static final String FIELD_GETTER_ACTIVEMQ_SESSION = FIELD_GETTER_BASE + ".ActiveMQSessionGetter";
public static final String FIELD_GETTER_SOCKET = FIELD_GETTER_BASE + ".SocketGetter";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2016 Naver Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.activemq.client;

import org.apache.activemq.command.ActiveMQMessage;

import javax.jms.JMSException;
import javax.jms.Message;

/**
* @author HyunGil Jeong
*/
public enum ActiveMQClientHeader {
ACTIVEMQ_TRACE_ID("Pinpoint-TraceID"),
ACTIVEMQ_SPAN_ID("Pinpoint-SpanID"),
ACTIVEMQ_PARENT_SPAN_ID("Pinpoint-pSpanID"),
ACTIVEMQ_SAMPLED("Pinpoint-Sampled"),
ACTIVEMQ_FLAGS("Pinpoint-Flags"),
ACTIVEMQ_PARENT_APPLICATION_NAME("Pinpoint-pAppName"),
ACTIVEMQ_PARENT_APPLICATION_TYPE("Pinpoint-pAppType");
// ACTIVEMQ_HOST("Pinpoint-Host");

private final String id;

ActiveMQClientHeader(String id) {
this.id = id;
}

public String getId() {
return this.id;
}

private interface MessageHandler<T> {
void setMessage(Message message, ActiveMQClientHeader key, T value) throws JMSException;

T getMessage(Message message, ActiveMQClientHeader key, T defaultValue);
}

private static abstract class MessageHandlerBase<T> implements MessageHandler<T> {

@Override
public final void setMessage(Message message, ActiveMQClientHeader key, T value) throws JMSException {
String id = key.id;
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMQMessage = (ActiveMQMessage) message;
if (activeMQMessage.isReadOnlyProperties()) {
activeMQMessage.setReadOnlyProperties(false);
setMessage0(message, id, value);
activeMQMessage.setReadOnlyProperties(true);
return;
}
}
setMessage0(message, id, value);
}

@Override
public final T getMessage(Message message, ActiveMQClientHeader key, T defaultValue) {
String id = key.id;
try {
if (message.propertyExists(id)) {
return getMessage0(message, id);
}
} catch (JMSException e) {
// just ignore and return default value
}
return defaultValue;
}

protected abstract void setMessage0(Message message, String id, T value) throws JMSException;

protected abstract T getMessage0(Message message, String id) throws JMSException;
}

private static final MessageHandler<String> STRING_MESSAGE_HANDLER = new MessageHandlerBase<String>() {

@Override
protected void setMessage0(Message message, String id, String value) throws JMSException {
message.setStringProperty(id, value);
}

@Override
protected String getMessage0(Message message, String id) throws JMSException {
return message.getStringProperty(id);
}
};

private static final MessageHandler<Long> LONG_MESSAGE_HANDLER = new MessageHandlerBase<Long>() {

@Override
protected void setMessage0(Message message, String id, Long value) throws JMSException {
message.setLongProperty(id, value);
}

@Override
protected Long getMessage0(Message message, String id) throws JMSException {
return message.getLongProperty(id);
}
};

private static final MessageHandler<Short> SHORT_MESSAGE_HANDLER = new MessageHandlerBase<Short>() {

@Override
protected void setMessage0(Message message, String id, Short value) throws JMSException {
message.setShortProperty(id, value);
}

@Override
protected Short getMessage0(Message message, String id) throws JMSException {
return message.getShortProperty(id);
}
};

private static final MessageHandler<Boolean> BOOLEAN_MESSAGE_HANDLER = new MessageHandlerBase<Boolean>() {

@Override
protected void setMessage0(Message message, String id, Boolean value) throws JMSException {
message.setBooleanProperty(id, value);
}

@Override
protected Boolean getMessage0(Message message, String id) throws JMSException {
return message.getBooleanProperty(id);
}
};

public static void setTraceId(Message message, String traceId) throws JMSException {
STRING_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_TRACE_ID, traceId);
}

public static String getTraceId(Message message, String defaultValue) {
return STRING_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_TRACE_ID, defaultValue);
}

public static void setSpanId(Message message, Long spanId) throws JMSException {
LONG_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_SPAN_ID, spanId);
}

public static Long getSpanId(Message message, Long defaultValue) {
return LONG_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_SPAN_ID, defaultValue);
}

public static void setParentSpanId(Message message, Long parentSpanId) throws JMSException {
LONG_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_PARENT_SPAN_ID, parentSpanId);
}

public static Long getParentSpanId(Message message, Long defaultValue) {
return LONG_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_PARENT_SPAN_ID, defaultValue);
}

public static void setSampled(Message message, Boolean sampled) throws JMSException {
BOOLEAN_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_SAMPLED, sampled);
}

public static Boolean getSampled(Message message, Boolean defaultValue) {
return BOOLEAN_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_SAMPLED, defaultValue);
}

public static void setFlags(Message message, Short flags) throws JMSException {
SHORT_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_FLAGS, flags);
}

public static Short getFlags(Message message, Short defaultValue) {
return SHORT_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_FLAGS, defaultValue);
}

public static void setParentApplicationName(Message message, String parentApplicationName) throws JMSException {
STRING_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_PARENT_APPLICATION_NAME, parentApplicationName);
}

public static String getParentApplicationName(Message message, String defaultValue) {
return STRING_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_PARENT_APPLICATION_NAME, defaultValue);
}

public static void setParentApplicationType(Message message, Short parentApplicationType) throws JMSException {
SHORT_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_PARENT_APPLICATION_TYPE, parentApplicationType);
}

public static Short getParentApplicationType(Message message, Short defaultValue) {
return SHORT_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_PARENT_APPLICATION_TYPE, defaultValue);
}

// public static void setHost(Message message, String host) throws JMSException {
// STRING_MESSAGE_HANDLER.setMessage(message, ACTIVEMQ_HOST, host);
// }
//
// public static String getHost(Message message, String defaultValue) {
// return STRING_MESSAGE_HANDLER.getMessage(message, ACTIVEMQ_HOST, defaultValue);
// }

}
Loading

0 comments on commit 0317a98

Please # to comment.