Skip to content

Commit

Permalink
Closes #681 - Add support for metric quantiles to the EUM server (#937)
Browse files Browse the repository at this point in the history
  • Loading branch information
Henning-Schulz authored Oct 19, 2020
1 parent 96a44ce commit 2adc897
Show file tree
Hide file tree
Showing 8 changed files with 1,122 additions and 11 deletions.
4 changes: 3 additions & 1 deletion components/inspectit-ocelot-eum-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ dependencies {
'commons-net:commons-net:3.3',
'org.springframework.boot:spring-boot-starter-actuator',
"rocks.inspectit:opencensus-influxdb-exporter:1.2",
"org.influxdb:influxdb-java:2.15"
"org.influxdb:influxdb-java:2.15",
"org.apache.commons:commons-lang3:3.+",
'org.apache.commons:commons-math3:3.6.1'
)
compileOnly "org.projectlombok:lombok:1.18.12"
annotationProcessor "org.projectlombok:lombok:1.18.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import rocks.inspectit.oce.eum.server.configuration.model.EumServerConfiguration;
import rocks.inspectit.oce.eum.server.metrics.percentiles.PercentileViewManager;
import rocks.inspectit.ocelot.config.model.exporters.metrics.InfluxExporterSettings;
import rocks.inspectit.opencensus.influx.InfluxExporter;

Expand All @@ -25,6 +26,9 @@ public class InfluxExporterService {
@Autowired
private ScheduledExecutorService executor;

@Autowired
private PercentileViewManager percentileViewManager;

@Autowired
private EumServerConfiguration config;

Expand Down Expand Up @@ -60,6 +64,7 @@ private void doEnable() {
.password(influx.getPassword())
.createDatabase(influx.isCreateDatabase())
.exportDifference(influx.isCountersAsDifferences())
.measurementNameProvider(percentileViewManager::getMeasureNameForSeries)
.bufferSize(influx.getBufferSize())
.build();
exporterTask = executor.scheduleAtFixedRate(activeExporter::export, 0, influx.getExportInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rocks.inspectit.oce.eum.server.configuration.model.EumServerConfiguration;
import rocks.inspectit.oce.eum.server.metrics.percentiles.PercentileViewManager;
import rocks.inspectit.oce.eum.server.utils.TagUtils;
import rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings;
import rocks.inspectit.ocelot.config.model.metrics.definition.ViewDefinitionSettings;

import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -36,6 +38,9 @@ public class MeasuresAndViewsManager {
@Autowired
private ViewManager viewManager;

@Autowired
private PercentileViewManager percentileViewManager;

/**
* Records the measure.
*
Expand All @@ -60,14 +65,18 @@ public void recordMeasure(String measureName, MetricDefinitionSettings metricDef
.record();
break;
}

percentileViewManager.recordMeasurement(measureName, value.doubleValue(), Tags.getTagger()
.getCurrentTagContext());
}

/**
* Updates the metrics
*/
public void updateMetrics(String name, MetricDefinitionSettings metricDefinition) {
if (!metrics.containsKey(name)) {
MetricDefinitionSettings populatedMetricDefinition = metricDefinition.getCopyWithDefaultsPopulated(name);
MetricDefinitionSettings populatedMetricDefinition = metricDefinition.getCopyWithDefaultsPopulated(name, Duration
.ofSeconds(15)); // Default value of 15s will be overridden by configuration.
Measure measure = createMeasure(name, populatedMetricDefinition);
metrics.put(name, measure);
updateViews(name, populatedMetricDefinition);
Expand Down Expand Up @@ -98,26 +107,50 @@ private void updateViews(String metricName, MetricDefinitionSettings metricDefin
String viewName = viewDefinitionSettingsEntry.getKey();
ViewDefinitionSettings viewDefinitionSettings = viewDefinitionSettingsEntry.getValue();
if (viewManager.getAllExportedViews().stream().noneMatch(v -> v.getName().asString().equals(viewName))) {
Aggregation aggregation = createAggregation(viewDefinitionSettings);
List<TagKey> tagKeys = getTagsForView(viewDefinitionSettings).stream()
.map(TagKey::create)
.collect(Collectors.toList());
View view = View.create(View.Name.create(viewName), metricDefinition.getDescription(), metrics.get(metricName), aggregation, tagKeys);
viewManager.registerView(view);
Measure measure = metrics.get(metricName);

if (percentileViewManager.isViewRegistered(metricName, viewName) || viewDefinitionSettings.getAggregation() == ViewDefinitionSettings.Aggregation.QUANTILES) {
addPercentileView(measure, viewName, viewDefinitionSettings);
} else {
registerNewView(measure, viewName, viewDefinitionSettings);
}
}
}
}

private void addPercentileView(Measure measure, String viewName, ViewDefinitionSettings def) {
List<TagKey> viewTags = getTagKeysForView(def);
Set<String> tagsAsStrings = viewTags.stream().map(TagKey::getName).collect(Collectors.toSet());
boolean minEnabled = def.getQuantiles().contains(0.0);
boolean maxEnabled = def.getQuantiles().contains(1.0);
List<Double> percentilesFiltered = def.getQuantiles()
.stream()
.filter(p -> p > 0 && p < 1)
.collect(Collectors.toList());
percentileViewManager.createOrUpdateView(measure.getName(), viewName, measure.getUnit(), def.getDescription(), minEnabled, maxEnabled, percentilesFiltered, def
.getTimeWindow()
.toMillis(), tagsAsStrings, def.getMaxBufferedPoints());
}

private void registerNewView(Measure measure, String viewName, ViewDefinitionSettings def) {
Aggregation aggregation = createAggregation(def);
List<TagKey> tagKeys = getTagKeysForView(def);
View view = View.create(View.Name.create(viewName), def.getDescription(), measure, aggregation, tagKeys);
viewManager.registerView(view);
}

/**
* Returns all tags, which are exposed for the given metricDefinition
*/
private Set<String> getTagsForView(ViewDefinitionSettings viewDefinitionSettings) {
private List<TagKey> getTagKeysForView(ViewDefinitionSettings viewDefinitionSettings) {
Set<String> tags = new HashSet<>(configuration.getTags().getDefineAsGlobal());
tags.addAll(viewDefinitionSettings.getTags().entrySet().stream()
tags.addAll(viewDefinitionSettings.getTags()
.entrySet()
.stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.collect(Collectors.toList()));
return tags;
return tags.stream().map(TagKey::create).collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package rocks.inspectit.oce.eum.server.metrics.percentiles;

import com.google.common.annotations.VisibleForTesting;
import io.opencensus.common.Timestamp;
import io.opencensus.tags.TagContext;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue;

/**
* COPIED FROM THE OCELOT CORE PROJECT!
* <p>
* Consumer thread for asynchronously processing measurement observations.
*/
@Slf4j
class AsyncMetricRecorder {

private static final int QUEUE_CAPACITY = 8096;

private final MetricConsumer metricConsumer;

private volatile boolean overflowLogged = false;

private volatile boolean isDestroyed = false;

@VisibleForTesting
final ArrayBlockingQueue<MetricRecord> recordsQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

@VisibleForTesting
final Thread worker;

AsyncMetricRecorder(MetricConsumer consumer) {
metricConsumer = consumer;
worker = new Thread(this::doRecord);
worker.setDaemon(true);
worker.setName("InspectIT Ocelot percentile Recorder");
worker.start();
}

void record(String measureName, double value, Timestamp time, TagContext tags) {
boolean success = recordsQueue.offer(new MetricRecord(value, measureName, time, tags));
if (!success && !overflowLogged) {
overflowLogged = true;
log.warn("Measurement for percentiles has been dropped because queue is full. This message will not be shown for further drops!");
}
}

void destroy() {
isDestroyed = true;
worker.interrupt();
}

private void doRecord() {
while (true) {
try {
MetricRecord record = recordsQueue.take();
metricConsumer.record(record.measure, record.value, record.time, record.tagContext);
} catch (InterruptedException e) {
if (isDestroyed) {
return;
} else {
log.error("Unexpected interrupt", e);
}
} catch (Exception e) {
log.error("Error processing record: ", e);
}
}
}

public interface MetricConsumer {

void record(String measure, double value, Timestamp time, TagContext tags);
}

@Value
private static class MetricRecord {

double value;

String measure;

Timestamp time;

TagContext tagContext;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package rocks.inspectit.oce.eum.server.metrics.percentiles;

import io.opencensus.metrics.export.Metric;
import io.opencensus.metrics.export.MetricProducer;

import java.time.Duration;
import java.util.Collection;
import java.util.function.Supplier;

/**
* COPIED FROM THE OCELOT CORE PROJECT!
* <p>
* A metric producer which caches the metrics for a specified amount of time.
*/
public class CachingMetricProducer extends MetricProducer {

/**
* The function invoked to generate the metrics.
*/
private final Supplier<Collection<Metric>> computeMetricsFunction;

/**
* The duration for which cached metrics are kept.
*/
private final long cacheDurationNanos;

/**
* The timestamp when the metrics were computed the last time.
*/
private long cacheTimestamp;

private Collection<Metric> cachedMetrics = null;

/**
* Constructor.
*
* @param computeMetricsFunction the function to invoke for computing the metrics
* @param cacheDuration the duration for which the values shall be cached.
*/
public CachingMetricProducer(Supplier<Collection<Metric>> computeMetricsFunction, Duration cacheDuration) {
this.computeMetricsFunction = computeMetricsFunction;
cacheDurationNanos = cacheDuration.toNanos();
}

@Override
public synchronized Collection<Metric> getMetrics() {
long now = System.nanoTime();
if (cachedMetrics == null || (now - cacheTimestamp) > cacheDurationNanos) {
cachedMetrics = computeMetricsFunction.get();
cacheTimestamp = now;
}
return cachedMetrics;
}
}
Loading

0 comments on commit 2adc897

Please # to comment.