Skip to content

Commit

Permalink
Kafka update
Browse files Browse the repository at this point in the history
  • Loading branch information
mjok committed Nov 25, 2019
1 parent 3f8a2aa commit 58770f5
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public static void fillDefaultValuesAfter(EntryDefinition entryDefinition, long
entryDefinition.addProperty("PARENT", lastEntryDefinition.getId());
}
}
OutputManager.INSTANCE.send(entryDefinition);
if (!entryDefinition.isTransparent()) {
OutputManager.INSTANCE.send(entryDefinition);
}
}

public static void handleInstrumentedMethodException(EntryDefinition entryDefinition, Throwable exception,
Expand Down Expand Up @@ -140,7 +142,9 @@ public static long fillDefaultValuesBefore(EntryDefinition entryDefinition,
if (sendStackTrace) {
entryDefinition.setStackTrace(getStackTrace());
}
OutputManager.INSTANCE.send(entryDefinition);
if (!entryDefinition.isTransparent()) {
OutputManager.INSTANCE.send(entryDefinition);
}
} catch (Throwable t) {
if (logger != null) {
logger.info(format("####Advice error/fillDefaultValuesBefore: {0}", t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class EntryDefinition extends AbstractMarshallable {
Mode mode = Mode.RUNNING;

String returnType;

boolean transparent;
String returnValue;
String exception;
String correlator;
Expand Down Expand Up @@ -186,6 +188,14 @@ public void setException(Throwable exception) {
setExceptionTrace(stringWriter.toString());
}

public boolean isTransparent() {
return transparent;
}

public void setTransparent() {
transparent = true;
}

public enum EventType {
CALL, SEND, RECEIVE, OPEN, CLOSE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void before(// @Advice.This Object thiz, //
ed.setEventType(EntryDefinition.EventType.RECEIVE);
ed.addPropertyIfExist("TOPIC", topic);
ed.setResource(topic, EntryDefinition.ResourceType.QUEUE);
ed.setApplication("KAFKA");

ed.addPropertyIfExist("PARTITION", partition);
ed.addPropertyIfExist("OFFSET", offset);
ed.addPropertyIfExist("TIMESTAMP", timestamp);
Expand All @@ -113,6 +113,7 @@ public static void before(// @Advice.This Object thiz, //
/* startTime = */ fillDefaultValuesBefore(ed, stackThreadLocal, null, null, logging ? logger : null)//
);
ed.setEventType(EntryDefinition.EventType.RECEIVE);
ed.setApplication("KAFKA");

} catch (Throwable t) {
handleAdviceException(t, ADVICE_NAME + "start", logging ? logger : null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package com.jkoolcloud.remora.advices;

import static com.jkoolcloud.remora.core.utils.ReflectionUtils.getFieldValue;
import static net.bytebuddy.matcher.ElementMatchers.*;

import java.lang.instrument.Instrumentation;
import java.lang.reflect.Method;
import java.text.MessageFormat;

import org.apache.kafka.clients.consumer.Consumer;
import org.tinylog.Logger;
import org.tinylog.TaggedLogger;

import com.jkoolcloud.remora.RemoraConfig;
import com.jkoolcloud.remora.core.EntryDefinition;

import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KafkaConsumerClientAdvice extends BaseTransformers {

public static final String ADVICE_NAME = "KafkaConsumerClientAdvice";
public static String[] INTERCEPTING_CLASS = { "org.apache.kafka.clients.consumer.Consumer" };
public static String INTERCEPTING_METHOD = "poll";

@RemoraConfig.Configurable
public static boolean load = true;
@RemoraConfig.Configurable
public static boolean logging = false;
public static TaggedLogger logger;
static AgentBuilder.Transformer.ForAdvice advice = new AgentBuilder.Transformer.ForAdvice()
.include(KafkaConsumerClientAdvice.class.getClassLoader()).include(RemoraConfig.INSTANCE.classLoader)//
.advice(methodMatcher(), KafkaConsumerClientAdvice.class.getName());

/**
* Method matcher intended to match intercepted class method/s to instrument. See (@ElementMatcher) for available
* method matches.
*/

private static ElementMatcher<? super MethodDescription> methodMatcher() {
return named(INTERCEPTING_METHOD);
}

/**
* Advices before method is called before instrumented method code
*
* @param thiz
* reference to method object
* @param method
* instrumented method description
* @param ed
* {@link EntryDefinition} for collecting ant passing values to
* {@link com.jkoolcloud.remora.core.output.OutputManager}
* @param startTime
* method startTime
*
*/

@Advice.OnMethodEnter
public static void before(@Advice.This Consumer thiz, //
@Advice.Origin Method method, //
@Advice.Local("ed") EntryDefinition ed, //
@Advice.Local("startTime") long startTime) {
try {
if (ed == null) {
ed = new EntryDefinition(KafkaConsumerClientAdvice.class);
}
if (logging) {
logger.info(format("Entering: {0} {1}", KafkaConsumerClientAdvice.class.getName(), "before"));
}
ed.setTransparent();
startTime = fillDefaultValuesBefore(ed, stackThreadLocal, thiz, method, logging ? logger : null);

ed.setEventType(EntryDefinition.EventType.CALL);
String clientId = getFieldValue(thiz, String.class, "clientId");
String groupId = getFieldValue(thiz, String.class, "groupId");
ed.setApplication(MessageFormat.format("clientId={}, groupId={}", clientId, groupId));
} catch (Throwable t) {
handleAdviceException(t, ADVICE_NAME, logging ? logger : null);
}
}

/**
* Method called on instrumented method finished.
*
* @param method
* instrumented method description
* @param arguments
* arguments provided for method
* @param exception
* exception thrown in method exit (not caught)
* @param ed
* {@link EntryDefinition} passed along the method (from before method)
* @param startTime
* startTime passed along the method
*/

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(@Advice.This Consumer producer, //
@Advice.Origin Method method, //
@Advice.Thrown Throwable exception, @Advice.Local("ed") EntryDefinition ed, //
@Advice.Local("startTime") long startTime) {
boolean doFinally = true;
try {
if (ed == null) { // ed expected to be null if not created by entry, that's for duplicates
if (logging) {
logger.info("EntryDefinition not exist, entry might be filtered out as duplicate or ran on test");
}
doFinally = false;
return;
}
if (logging) {
logger.info(format("Exiting: {0} {1}", KafkaConsumerClientAdvice.class.getName(), "after"));
}
fillDefaultValuesAfter(ed, startTime, exception, logging ? logger : null);
} catch (Throwable t) {
handleAdviceException(t, ADVICE_NAME, logging ? logger : null);
} finally {
if (doFinally) {
doFinally();
}
}

}

/**
* Type matcher should find the class intended for instrumentation See (@ElementMatcher) for available matches.
*/

@Override
public ElementMatcher<TypeDescription> getTypeMatcher() {
return hasSuperType(named(INTERCEPTING_CLASS[0])).and(not(isInterface()));
}

@Override
public AgentBuilder.Transformer getAdvice() {
return advice;
}

@Override
protected AgentBuilder.Listener getListener() {
return new TransformationLoggingListener(logger);
}

@Override
public void install(Instrumentation instrumentation) {
logger = Logger.tag(ADVICE_NAME);
if (load) {
getTransform().with(getListener()).installOn(instrumentation);
} else {
logger.info("Advice {0} not enabled", getName());
}
}

@Override
public String getName() {
return ADVICE_NAME;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.jkoolcloud.remora.RemoraConfig;
import com.jkoolcloud.remora.core.EntryDefinition;
import com.jkoolcloud.remora.core.utils.ReflectionUtils;

import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -77,7 +78,8 @@ public static void before(@Advice.This KafkaProducer thiz, //
startTime = fillDefaultValuesBefore(ed, stackThreadLocal, thiz, method, logging ? logger : null);
ed.setEventType(EntryDefinition.EventType.SEND);
String topic = record.topic();
ed.setApplication("KAFKA");

ed.setApplication(ReflectionUtils.getFieldValue(thiz, String.class, "clientId"));
ed.addPropertyIfExist("TOPIC", topic);
ed.addPropertyIfExist("TIMESTAMP", record.timestamp());
ed.addPropertyIfExist("PARTITION", record.partition());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
com.jkoolcloud.remora.advices.KafkaConsumerAdvice
com.jkoolcloud.remora.advices.KafkaProducerAdvice
com.jkoolcloud.remora.advices.KafkaProducerAdvice
com.jkoolcloud.remora.advices.KafkaConsumerClientAdvice
15 changes: 13 additions & 2 deletions remora-stream/samples/remora-streamer/tnt-data-source.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
$fieldValue != null && $fieldValue.startsWith("ID:") ? $fieldValue - "ID:" : null
]]></field-transform>
</field>
<field name="all" locator="#"></field>
<field name="TIMESTAMP" datatype="Timestamp" units="Milliseconds">
<field-locator id="TNum" locator="TIMESTAMP" locator-type="Label" datatype="Number"/>
</field>

<field name="all" locator="#"/>
</parser>

<parser name="TokenParser" class="com.jkoolcloud.tnt4j.streams.parsers.ActivityJavaObjectParser">
Expand Down Expand Up @@ -47,10 +51,17 @@
<field name="ServerName" locator="server" locator-type="Label"/>

<embedded-activity name="properties" locator="properties" locator-type="Label">
<parser-ref name="propertiesParser"></parser-ref>
<parser-ref name="propertiesParser"/>
</embedded-activity>
<field name="EventType" locator="eventType" locator-type="Label"/>
<field name="MODE" locator="mode" locator-type="Label"/>
<field name="MessageAge" datatype="Number" value-type="age.usec">
<field-locator id="TimestampRAW" locator="TIMESTAMP" locator-type="Activity" datatype="AsInput">
<field-transform lang="groovy" phase="raw"><![CDATA[
$fieldValue != null ? ${StartTime} - $fieldValue : null
]]></field-transform>
</field-locator>
</field>
</parser>

<stream name="ChronicleStream" class="com.jkoolcloud.tnt4j.streams.inputs.ChronicleQueueStream">
Expand Down

0 comments on commit 58770f5

Please # to comment.