From ec9c0368b0d84582a2e3d4897d831c237e96aced Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 25 Apr 2024 15:55:48 -0700 Subject: [PATCH 1/6] Add Schema Registry metrics --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../service/schema/SchemaRegistryService.java | 6 +- .../schema/SchemaRegistryServiceImpl.java | 37 +++-- .../service/schema/SchemaRegistryStats.java | 146 +++++++++++++++--- .../apache/pulsar/TestNGInstanceOrder.java | 38 +++++ .../service/schema/SchemaServiceTest.java | 119 ++++++++++---- .../pulsar/client/api/SimpleSchemaTest.java | 3 + .../OpenTelemetryAttributes.java | 5 + 8 files changed, 284 insertions(+), 72 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 96f3653ea9966..aba8539543cbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -812,7 +812,7 @@ public void start() throws PulsarServerException { schemaStorage = createAndStartSchemaStorage(); schemaRegistryService = SchemaRegistryService.create( - schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor); + schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this); OffloadPoliciesImpl defaultOffloadPolicies = OffloadPoliciesImpl.create(this.getConfiguration().getProperties()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java index 3c5e3aae7ff5d..2a2467d3947ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -21,7 +21,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.schema.SchemaType; @@ -44,13 +44,13 @@ static Map getCheckers(Set checker } static SchemaRegistryService create(SchemaStorage schemaStorage, Set schemaRegistryCompatibilityCheckers, - ScheduledExecutorService scheduler) { + PulsarService pulsarService) { if (schemaStorage != null) { try { Map checkers = getCheckers(schemaRegistryCompatibilityCheckers); checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers)); return SchemaRegistryServiceWithSchemaDataValidator.of( - new SchemaRegistryServiceImpl(schemaStorage, checkers, scheduler)); + new SchemaRegistryServiceImpl(schemaStorage, checkers, pulsarService)); } catch (Exception e) { LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index ae56df248d85d..843c250e2aa38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; @@ -47,6 +46,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; @@ -70,19 +70,19 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { @VisibleForTesting SchemaRegistryServiceImpl(SchemaStorage schemaStorage, - Map compatibilityChecks, Clock clock, - ScheduledExecutorService scheduler) { + Map compatibilityChecks, + Clock clock, + PulsarService pulsarService) { this.schemaStorage = schemaStorage; this.compatibilityChecks = compatibilityChecks; this.clock = clock; - this.stats = SchemaRegistryStats.getInstance(scheduler); + this.stats = new SchemaRegistryStats(pulsarService); } - @VisibleForTesting SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map compatibilityChecks, - ScheduledExecutorService scheduler) { - this(schemaStorage, compatibilityChecks, Clock.systemUTC(), scheduler); + PulsarService pulsarService) { + this(schemaStorage, compatibilityChecks, Clock.systemUTC(), pulsarService); } @Override @@ -136,16 +136,17 @@ public CompletableFuture getSchema(String schemaId, SchemaVer } }) .whenComplete((v, t) -> { + var latencyMs = this.clock.millis() - start; if (t != null) { if (log.isDebugEnabled()) { log.debug("[{}] Get schema failed", schemaId); } - this.stats.recordGetFailed(schemaId); + this.stats.recordGetFailed(schemaId, latencyMs); } else { if (log.isDebugEnabled()) { log.debug(null == v ? "[{}] Schema not found" : "[{}] Schema is present", schemaId); } - this.stats.recordGetLatency(schemaId, this.clock.millis() - start); + this.stats.recordGetLatency(schemaId, latencyMs); } }); } @@ -157,10 +158,11 @@ public CompletableFuture>> getAllSchem return schemaStorage.getAll(schemaId) .thenCompose(schemas -> convertToSchemaAndMetadata(schemaId, schemas)) .whenComplete((v, t) -> { + var latencyMs = this.clock.millis() - start; if (t != null) { - this.stats.recordGetFailed(schemaId); + this.stats.recordListFailed(schemaId, latencyMs); } else { - this.stats.recordGetLatency(schemaId, this.clock.millis() - start); + this.stats.recordListLatency(schemaId, latencyMs); } }); } @@ -228,10 +230,11 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context)); }); }))).whenComplete((v, ex) -> { + var latencyMs = this.clock.millis() - start.getValue(); if (ex != null) { log.error("[{}] Put schema failed", schemaId, ex); if (start.getValue() != 0) { - this.stats.recordPutFailed(schemaId); + this.stats.recordPutFailed(schemaId, latencyMs); } promise.completeExceptionally(ex); } else { @@ -261,14 +264,15 @@ public CompletableFuture deleteSchema(String schemaId, String use return schemaStorage .put(schemaId, deletedEntry, new byte[]{}) .whenComplete((v, t) -> { + var latencyMs = this.clock.millis() - start; if (t != null) { log.error("[{}] User {} delete schema failed", schemaId, user); - this.stats.recordDelFailed(schemaId); + this.stats.recordDelFailed(schemaId, latencyMs); } else { if (log.isDebugEnabled()) { log.debug("[{}] User {} delete schema finished", schemaId, user); } - this.stats.recordDelLatency(schemaId, this.clock.millis() - start); + this.stats.recordDelLatency(schemaId, latencyMs); } }); } @@ -284,11 +288,12 @@ public CompletableFuture deleteSchemaStorage(String schemaId, boo return schemaStorage.delete(schemaId, forcefully) .whenComplete((v, t) -> { + var latencyMs = this.clock.millis() - start; if (t != null) { - this.stats.recordDelFailed(schemaId); + this.stats.recordDelFailed(schemaId, latencyMs); log.error("[{}] Delete schema storage failed", schemaId); } else { - this.stats.recordDelLatency(schemaId, this.clock.millis() - start); + this.stats.recordDelLatency(schemaId, latencyMs); if (log.isDebugEnabled()) { log.debug("[{}] Delete schema storage finished", schemaId); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java index 32e9e36853026..c6c4aedb3c328 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java @@ -18,47 +18,93 @@ */ package org.apache.pulsar.broker.service.schema; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Summary; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.stats.MetricsUtil; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; class SchemaRegistryStats implements AutoCloseable, Runnable { private static final String NAMESPACE = "namespace"; private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1}; - private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + public static final AttributeKey REQUEST_TYPE_KEY = + AttributeKey.stringKey("pulsar.schema_registry.request"); + @VisibleForTesting + enum RequestType { + GET, + LIST, + PUT, + DELETE; + + public final Attributes attributes = Attributes.of(REQUEST_TYPE_KEY, name().toLowerCase()); + } + + public static final AttributeKey RESPONSE_TYPE_KEY = + AttributeKey.stringKey("pulsar.schema_registry.response"); + @VisibleForTesting + enum ResponseType { + SUCCESS, + FAILURE; + + public final Attributes attributes = Attributes.of(RESPONSE_TYPE_KEY, name().toLowerCase()); + } + + public static final AttributeKey COMPATIBILITY_CHECK_RESPONSE_KEY = + AttributeKey.stringKey("pulsar.schema_registry.compatibility_check.response"); + @VisibleForTesting + enum CompatibilityCheckResponse { + COMPATIBLE, + INCOMPATIBLE; + + public final Attributes attributes = Attributes.of(COMPATIBILITY_CHECK_RESPONSE_KEY, name().toLowerCase()); + } + + public static final String SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME = + "pulsar.broker.request.schema_registry.duration"; + private final DoubleHistogram latencyHistogram; + + public static final String COMPATIBLE_COUNTER_METRIC_NAME = + "pulsar.broker.operation.schema_registry.compatibility_check.count"; + private final LongCounter schemaCompatibilityCounter; + + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private final Counter getOpsFailedCounter; + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private final Counter putOpsFailedCounter; + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private final Counter deleteOpsFailedCounter; + @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME) private final Counter compatibleCounter; + @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME) private final Counter incompatibleCounter; + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private final Summary deleteOpsLatency; + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private final Summary getOpsLatency; + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private final Summary putOpsLatency; - private final Map namespaceAccess = new ConcurrentHashMap<>(); - private ScheduledFuture future; - - private static volatile SchemaRegistryStats instance; - - static synchronized SchemaRegistryStats getInstance(ScheduledExecutorService scheduler) { - if (null == instance) { - instance = new SchemaRegistryStats(scheduler); - } + private boolean closed; - return instance; - } + private final Map namespaceAccess = new ConcurrentHashMap<>(); + private final ScheduledFuture future; - private SchemaRegistryStats(ScheduledExecutorService scheduler) { + public SchemaRegistryStats(PulsarService pulsarService) { this.deleteOpsFailedCounter = Counter.build("pulsar_schema_del_ops_failed_total", "-") .labelNames(NAMESPACE).create().register(); this.getOpsFailedCounter = Counter.build("pulsar_schema_get_ops_failed_total", "-") @@ -75,9 +121,19 @@ private SchemaRegistryStats(ScheduledExecutorService scheduler) { this.getOpsLatency = this.buildSummary("pulsar_schema_get_ops_latency", "-"); this.putOpsLatency = this.buildSummary("pulsar_schema_put_ops_latency", "-"); - if (null != scheduler) { - this.future = scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES); - } + this.closed = false; + + this.future = pulsarService.getExecutor().scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES); + + var meter = pulsarService.getOpenTelemetry().getMeter(); + latencyHistogram = meter.histogramBuilder(SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) + .setDescription("The duration of Schema Registry requests.") + .setUnit("s") + .build(); + schemaCompatibilityCounter = meter.counterBuilder(COMPATIBLE_COUNTER_METRIC_NAME) + .setDescription("The number of Schema Registry compatibility check operations performed by the broker.") + .setUnit("{operation}") + .build(); } private Summary buildSummary(String name, String help) { @@ -90,38 +146,77 @@ private Summary buildSummary(String name, String help) { return builder.create().register(); } - void recordDelFailed(String schemaId) { + void recordDelFailed(String schemaId, long millis) { this.deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + recordOperationLatency(schemaId, millis, RequestType.DELETE, ResponseType.FAILURE); + } + + void recordGetFailed(String schemaId, long millis) { + this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + recordOperationLatency(schemaId, millis, RequestType.GET, ResponseType.FAILURE); } - void recordGetFailed(String schemaId) { + void recordListFailed(String schemaId, long millis) { this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + recordOperationLatency(schemaId, millis, RequestType.LIST, ResponseType.FAILURE); } - void recordPutFailed(String schemaId) { + void recordPutFailed(String schemaId, long millis) { this.putOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + recordOperationLatency(schemaId, millis, RequestType.PUT, ResponseType.FAILURE); } void recordDelLatency(String schemaId, long millis) { this.deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis); + recordOperationLatency(schemaId, millis, RequestType.DELETE, ResponseType.SUCCESS); } void recordGetLatency(String schemaId, long millis) { this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis); + recordOperationLatency(schemaId, millis, RequestType.GET, ResponseType.SUCCESS); + } + + void recordListLatency(String schemaId, long millis) { + this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis); + recordOperationLatency(schemaId, millis, RequestType.LIST, ResponseType.SUCCESS); } void recordPutLatency(String schemaId, long millis) { this.putOpsLatency.labels(getNamespace(schemaId)).observe(millis); + recordOperationLatency(schemaId, millis, RequestType.PUT, ResponseType.SUCCESS); + } + + private void recordOperationLatency(String schemaId, long millis, + RequestType requestType, ResponseType responseType) { + var duration = MetricsUtil.convertToSeconds(millis, TimeUnit.MILLISECONDS); + var namespace = getNamespace(schemaId); + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, namespace) + .putAll(requestType.attributes) + .putAll(responseType.attributes) + .build(); + latencyHistogram.record(duration, attributes); } void recordSchemaIncompatible(String schemaId) { - this.incompatibleCounter.labels(getNamespace(schemaId)).inc(); + var namespace = getNamespace(schemaId); + this.incompatibleCounter.labels(namespace).inc(); + recordSchemaCompabilityResult(namespace, CompatibilityCheckResponse.INCOMPATIBLE); } void recordSchemaCompatible(String schemaId) { - this.compatibleCounter.labels(getNamespace(schemaId)).inc(); + var namespace = getNamespace(schemaId); + this.compatibleCounter.labels(namespace).inc(); + recordSchemaCompabilityResult(namespace, CompatibilityCheckResponse.COMPATIBLE); } + private void recordSchemaCompabilityResult(String namespace, CompatibilityCheckResponse result) { + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, namespace) + .putAll(result.attributes) + .build(); + schemaCompatibilityCounter.add(1, attributes); + } private String getNamespace(String schemaId) { String namespace; @@ -148,8 +243,8 @@ private void removeChild(String namespace) { } @Override - public void close() throws Exception { - if (CLOSED.compareAndSet(false, true)) { + public synchronized void close() throws Exception { + if (!closed) { CollectorRegistry.defaultRegistry.unregister(this.deleteOpsFailedCounter); CollectorRegistry.defaultRegistry.unregister(this.getOpsFailedCounter); CollectorRegistry.defaultRegistry.unregister(this.putOpsFailedCounter); @@ -161,6 +256,7 @@ public void close() throws Exception { if (null != this.future) { this.future.cancel(false); } + closed = true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java b/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java new file mode 100644 index 0000000000000..50c9863d586ec --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar; + +import java.util.Comparator; +import java.util.List; +import org.testng.IMethodInstance; +import org.testng.IMethodInterceptor; +import org.testng.ITestContext; + +// Sorts the test methods by test object instance hashcode, then priority, then method name. Useful when Factory +// generated tests interfere with each other. +public class TestNGInstanceOrder implements IMethodInterceptor { + @Override + public List intercept(List methods, ITestContext context) { + return methods.stream().sorted(Comparator.comparingInt(o -> o.getInstance().hashCode()) + .thenComparingInt(o -> o.getMethod().getInterceptedPriority()) + .thenComparingInt(o -> o.getMethod().getPriority()) + .thenComparing(o -> o.getMethod().getMethodName())) + .toList(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index 3a4016eb79c21..49a46ab7a89b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -18,17 +18,20 @@ */ package org.apache.pulsar.broker.service.schema; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; +import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD; +import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertNull; -import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Multimap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import io.opentelemetry.api.common.Attributes; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -39,15 +42,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.naming.TopicName; @@ -60,6 +66,8 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -68,7 +76,7 @@ @Test(groups = "broker") public class SchemaServiceTest extends MockedPulsarServiceBaseTest { - private static final Clock MockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault()); + private static final Clock MockClock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); private final String schemaId1 = "1/2/3/4"; private static final String userId = "user"; @@ -97,14 +105,30 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"); super.internalSetup(); + // Unregister the Prometheus metrics from the default collector. + pulsar.getSchemaRegistryService().close(); + BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); storage.start(); Map checkMap = new HashMap<>(); checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck()); - schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, null); + schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, pulsar); + + var schemaRegistryStats = + Mockito.spy((SchemaRegistryStats) FieldUtils.readField(schemaRegistryService, "stats", true)); + // Disable periodic cleanup of Prometheus entries. + Mockito.doNothing().when(schemaRegistryStats).run(); + FieldUtils.writeField(schemaRegistryService, "stats", schemaRegistryStats, true); + setupDefaultTenantAndNamespace(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + @AfterMethod(alwaysRun = true) @Override protected void cleanup() throws Exception { @@ -120,6 +144,32 @@ public void testSchemaRegistryMetrics() throws Exception { getSchema(schemaId, version(0)); deleteSchema(schemaId, version(1)); + var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertThat(otelMetrics).anySatisfy(metric -> assertThat(metric) + .hasName(SchemaRegistryStats.SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) + .hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying( + point -> point + .hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns", + SchemaRegistryStats.REQUEST_TYPE_KEY, "delete", + SchemaRegistryStats.RESPONSE_TYPE_KEY, "success")) + .hasCount(1), + point -> point + .hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns", + SchemaRegistryStats.REQUEST_TYPE_KEY, "put", + SchemaRegistryStats.RESPONSE_TYPE_KEY, "success")) + .hasCount(1), + point -> point + .hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns", + SchemaRegistryStats.REQUEST_TYPE_KEY, "list", + SchemaRegistryStats.RESPONSE_TYPE_KEY, "success")) + .hasCount(1), + point -> point + .hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns", + SchemaRegistryStats.REQUEST_TYPE_KEY, "get", + SchemaRegistryStats.RESPONSE_TYPE_KEY, "success")) + .hasCount(1) + ))); + ByteArrayOutputStream output = new ByteArrayOutputStream(); PrometheusMetricsTestUtil.generate(pulsar, false, false, false, output); output.flush(); @@ -311,16 +361,38 @@ public void dontReAddExistingSchemaInMiddle() throws Exception { putSchema(schemaId1, schemaData2, version(1)); } - @Test(expectedExceptions = ExecutionException.class) + @Test public void checkIsCompatible() throws Exception { - putSchema(schemaId1, schemaData1, version(0), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); - putSchema(schemaId1, schemaData2, version(1), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); - - assertTrue(schemaRegistryService.isCompatible(schemaId1, schemaData3, - SchemaCompatibilityStrategy.BACKWARD).get()); - assertFalse(schemaRegistryService.isCompatible(schemaId1, schemaData3, - SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE).get()); - putSchema(schemaId1, schemaData3, version(2), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); + var schemaId = BrokerTestUtil.newUniqueName("tenant/ns/topic"); + putSchema(schemaId, schemaData1, version(0), BACKWARD_TRANSITIVE); + putSchema(schemaId, schemaData2, version(1), BACKWARD_TRANSITIVE); + + assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3, BACKWARD)) + .isCompletedWithValue(true); + assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3, BACKWARD_TRANSITIVE)) + .failsWithin(1, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(IncompatibleSchemaException.class); + assertThatThrownBy(() -> + putSchema(schemaId, schemaData3, version(2), BACKWARD_TRANSITIVE)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(IncompatibleSchemaException.class); + + assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics()) + .anySatisfy(metric -> assertThat(metric) + .hasName(SchemaRegistryStats.COMPATIBLE_COUNTER_METRIC_NAME) + .hasLongSumSatisfying( + sum -> sum.hasPointsSatisfying( + point -> point + .hasAttributes(Attributes.of( + OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns", + SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, "compatible")) + .hasValue(2), + point -> point + .hasAttributes(Attributes.of( + OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns", + SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, "incompatible")) + .hasValue(2)))); } @Test @@ -376,20 +448,13 @@ private void deleteSchema(String schemaId, SchemaVersion expectedVersion) throws assertEquals(expectedVersion, version); } - private SchemaData randomSchema() { - UUID randomString = UUID.randomUUID(); - return SchemaData.builder() - .user(userId) - .type(SchemaType.JSON) - .timestamp(MockClock.millis()) - .isDeleted(false) - .data(randomString.toString().getBytes()) - .props(new TreeMap<>()) - .build(); - } - private static SchemaData getSchemaData(String schemaJson) { - return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build(); + return SchemaData.builder() + .data(schemaJson.getBytes()) + .type(SchemaType.AVRO) + .user(userId) + .timestamp(MockClock.millis()) + .build(); } private SchemaVersion version(long version) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index c8c7c3b2ccc38..e006b72fad279 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema.Parser; import org.apache.avro.reflect.ReflectData; +import org.apache.pulsar.TestNGInstanceOrder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException; @@ -66,10 +67,12 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; +import org.testng.annotations.Listeners; import org.testng.annotations.Test; @Test(groups = "broker-api") @Slf4j +@Listeners({ TestNGInstanceOrder.class }) public class SimpleSchemaTest extends ProducerConsumerBase { private static final String NAMESPACE = "my-property/my-ns"; diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index bdb002cb359ff..5dc0370f48333 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -29,4 +29,9 @@ public interface OpenTelemetryAttributes { * {@link OpenTelemetryService}. */ AttributeKey PULSAR_CLUSTER = AttributeKey.stringKey("pulsar.cluster"); + + /** + * The name of the Pulsar namespace. + */ + AttributeKey PULSAR_NAMESPACE = AttributeKey.stringKey("pulsar.namespace"); } From 7b66a74995e946458d9ab401a86e656a5c2236e2 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 29 May 2024 14:47:06 -0700 Subject: [PATCH 2/6] Use static Prometheus counters in SchemaRegistryStats --- .../service/schema/SchemaRegistryStats.java | 75 +++++++------------ 1 file changed, 28 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java index c6c4aedb3c328..f8134ffd1e25b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java @@ -23,7 +23,6 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; -import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Summary; import java.util.Map; @@ -81,23 +80,30 @@ enum CompatibilityCheckResponse { private final LongCounter schemaCompatibilityCounter; @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) - private final Counter getOpsFailedCounter; + private static final Counter getOpsFailedCounter = + Counter.build("pulsar_schema_get_ops_failed_total", "-").labelNames(NAMESPACE).create().register(); @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) - private final Counter putOpsFailedCounter; + private static final Counter putOpsFailedCounter = + Counter.build("pulsar_schema_put_ops_failed_total", "-").labelNames(NAMESPACE).create().register(); @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) - private final Counter deleteOpsFailedCounter; + private static final Counter deleteOpsFailedCounter = + Counter.build("pulsar_schema_del_ops_failed_total", "-").labelNames(NAMESPACE).create().register(); @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME) - private final Counter compatibleCounter; + private static final Counter compatibleCounter = + Counter.build("pulsar_schema_compatible_total", "-").labelNames(NAMESPACE).create().register(); @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME) - private final Counter incompatibleCounter; + private static final Counter incompatibleCounter = + Counter.build("pulsar_schema_incompatible_total", "-").labelNames(NAMESPACE).create().register(); @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) - private final Summary deleteOpsLatency; + private static final Summary deleteOpsLatency = buildSummary("pulsar_schema_del_ops_latency", "-"); + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) - private final Summary getOpsLatency; + private static final Summary getOpsLatency = buildSummary("pulsar_schema_get_ops_latency", "-"); + @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) - private final Summary putOpsLatency; + private static final Summary putOpsLatency = buildSummary("pulsar_schema_put_ops_latency", "-"); private boolean closed; @@ -105,22 +111,6 @@ enum CompatibilityCheckResponse { private final ScheduledFuture future; public SchemaRegistryStats(PulsarService pulsarService) { - this.deleteOpsFailedCounter = Counter.build("pulsar_schema_del_ops_failed_total", "-") - .labelNames(NAMESPACE).create().register(); - this.getOpsFailedCounter = Counter.build("pulsar_schema_get_ops_failed_total", "-") - .labelNames(NAMESPACE).create().register(); - this.putOpsFailedCounter = Counter.build("pulsar_schema_put_ops_failed_total", "-") - .labelNames(NAMESPACE).create().register(); - - this.compatibleCounter = Counter.build("pulsar_schema_compatible_total", "-") - .labelNames(NAMESPACE).create().register(); - this.incompatibleCounter = Counter.build("pulsar_schema_incompatible_total", "-") - .labelNames(NAMESPACE).create().register(); - - this.deleteOpsLatency = this.buildSummary("pulsar_schema_del_ops_latency", "-"); - this.getOpsLatency = this.buildSummary("pulsar_schema_get_ops_latency", "-"); - this.putOpsLatency = this.buildSummary("pulsar_schema_put_ops_latency", "-"); - this.closed = false; this.future = pulsarService.getExecutor().scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES); @@ -136,7 +126,7 @@ public SchemaRegistryStats(PulsarService pulsarService) { .build(); } - private Summary buildSummary(String name, String help) { + private static Summary buildSummary(String name, String help) { Summary.Builder builder = Summary.build(name, help).labelNames(NAMESPACE); for (double quantile : QUANTILES) { @@ -147,42 +137,42 @@ private Summary buildSummary(String name, String help) { } void recordDelFailed(String schemaId, long millis) { - this.deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc(); recordOperationLatency(schemaId, millis, RequestType.DELETE, ResponseType.FAILURE); } void recordGetFailed(String schemaId, long millis) { - this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + getOpsFailedCounter.labels(getNamespace(schemaId)).inc(); recordOperationLatency(schemaId, millis, RequestType.GET, ResponseType.FAILURE); } void recordListFailed(String schemaId, long millis) { - this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + getOpsFailedCounter.labels(getNamespace(schemaId)).inc(); recordOperationLatency(schemaId, millis, RequestType.LIST, ResponseType.FAILURE); } void recordPutFailed(String schemaId, long millis) { - this.putOpsFailedCounter.labels(getNamespace(schemaId)).inc(); + putOpsFailedCounter.labels(getNamespace(schemaId)).inc(); recordOperationLatency(schemaId, millis, RequestType.PUT, ResponseType.FAILURE); } void recordDelLatency(String schemaId, long millis) { - this.deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis); + deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis); recordOperationLatency(schemaId, millis, RequestType.DELETE, ResponseType.SUCCESS); } void recordGetLatency(String schemaId, long millis) { - this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis); + getOpsLatency.labels(getNamespace(schemaId)).observe(millis); recordOperationLatency(schemaId, millis, RequestType.GET, ResponseType.SUCCESS); } void recordListLatency(String schemaId, long millis) { - this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis); + getOpsLatency.labels(getNamespace(schemaId)).observe(millis); recordOperationLatency(schemaId, millis, RequestType.LIST, ResponseType.SUCCESS); } void recordPutLatency(String schemaId, long millis) { - this.putOpsLatency.labels(getNamespace(schemaId)).observe(millis); + putOpsLatency.labels(getNamespace(schemaId)).observe(millis); recordOperationLatency(schemaId, millis, RequestType.PUT, ResponseType.SUCCESS); } @@ -200,13 +190,13 @@ private void recordOperationLatency(String schemaId, long millis, void recordSchemaIncompatible(String schemaId) { var namespace = getNamespace(schemaId); - this.incompatibleCounter.labels(namespace).inc(); + incompatibleCounter.labels(namespace).inc(); recordSchemaCompabilityResult(namespace, CompatibilityCheckResponse.INCOMPATIBLE); } void recordSchemaCompatible(String schemaId) { var namespace = getNamespace(schemaId); - this.compatibleCounter.labels(namespace).inc(); + compatibleCounter.labels(namespace).inc(); recordSchemaCompabilityResult(namespace, CompatibilityCheckResponse.COMPATIBLE); } @@ -245,17 +235,8 @@ private void removeChild(String namespace) { @Override public synchronized void close() throws Exception { if (!closed) { - CollectorRegistry.defaultRegistry.unregister(this.deleteOpsFailedCounter); - CollectorRegistry.defaultRegistry.unregister(this.getOpsFailedCounter); - CollectorRegistry.defaultRegistry.unregister(this.putOpsFailedCounter); - CollectorRegistry.defaultRegistry.unregister(this.compatibleCounter); - CollectorRegistry.defaultRegistry.unregister(this.incompatibleCounter); - CollectorRegistry.defaultRegistry.unregister(this.deleteOpsLatency); - CollectorRegistry.defaultRegistry.unregister(this.getOpsLatency); - CollectorRegistry.defaultRegistry.unregister(this.putOpsLatency); - if (null != this.future) { - this.future.cancel(false); - } + namespaceAccess.keySet().forEach(this::removeChild); + future.cancel(false); closed = true; } } From 28b5055482cc1ec42aa3fc4cad6efc2bee487622 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 29 May 2024 15:46:46 -0700 Subject: [PATCH 3/6] Allow repeated closure of SchemaRegistryService --- .../broker/service/schema/SchemaRegistryStats.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java index f8134ffd1e25b..b1a7dc2a54133 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java @@ -105,14 +105,10 @@ enum CompatibilityCheckResponse { @PulsarDeprecatedMetric(newMetricName = SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME) private static final Summary putOpsLatency = buildSummary("pulsar_schema_put_ops_latency", "-"); - private boolean closed; - private final Map namespaceAccess = new ConcurrentHashMap<>(); private final ScheduledFuture future; public SchemaRegistryStats(PulsarService pulsarService) { - this.closed = false; - this.future = pulsarService.getExecutor().scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES); var meter = pulsarService.getOpenTelemetry().getMeter(); @@ -234,11 +230,8 @@ private void removeChild(String namespace) { @Override public synchronized void close() throws Exception { - if (!closed) { - namespaceAccess.keySet().forEach(this::removeChild); - future.cancel(false); - closed = true; - } + namespaceAccess.keySet().forEach(this::removeChild); + future.cancel(false); } @Override From 73d89275f055ed83734d17f1fba9f062de994198 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 30 May 2024 09:44:27 -0700 Subject: [PATCH 4/6] Fix SchemaServiceTest.checkIsCompatible not waiting for future completion --- .../broker/service/schema/SchemaServiceTest.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index ae6a7992bdd10..d6baffd48b16b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -34,6 +34,7 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; @@ -44,7 +45,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.PrometheusMetricsTestUtil; import org.apache.pulsar.broker.BrokerTestUtil; @@ -65,6 +65,7 @@ import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.assertj.core.api.InstanceOfAssertFactories; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -365,14 +366,15 @@ public void checkIsCompatible() throws Exception { putSchema(schemaId, schemaData1, version(0), BACKWARD_TRANSITIVE); putSchema(schemaId, schemaData2, version(1), BACKWARD_TRANSITIVE); + var timeout = Duration.ofSeconds(1); assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3, BACKWARD)) - .isCompletedWithValue(true); + .succeedsWithin(timeout, InstanceOfAssertFactories.BOOLEAN) + .isTrue(); assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3, BACKWARD_TRANSITIVE)) - .failsWithin(1, TimeUnit.SECONDS) + .failsWithin(timeout) .withThrowableOfType(ExecutionException.class) .withCauseInstanceOf(IncompatibleSchemaException.class); - assertThatThrownBy(() -> - putSchema(schemaId, schemaData3, version(2), BACKWARD_TRANSITIVE)) + assertThatThrownBy(() -> putSchema(schemaId, schemaData3, version(2), BACKWARD_TRANSITIVE)) .isInstanceOf(ExecutionException.class) .hasCauseInstanceOf(IncompatibleSchemaException.class); From 60e5ea76671f48c30a8aa18f1c3159098cb5f1e0 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 30 May 2024 09:44:45 -0700 Subject: [PATCH 5/6] Remove redundant closing of SR stats in test --- .../apache/pulsar/broker/service/schema/SchemaServiceTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index d6baffd48b16b..b05731080e8cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -104,8 +104,6 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"); super.internalSetup(); - // Unregister the Prometheus metrics from the default collector. - pulsar.getSchemaRegistryService().close(); BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); storage.start(); From d5c746bd45eff0587a4f4247b77bc0ce190944a6 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 30 May 2024 10:10:01 -0700 Subject: [PATCH 6/6] Cosmetic fixes --- .../apache/pulsar/broker/service/schema/SchemaServiceTest.java | 1 - .../apache/pulsar/opentelemetry/OpenTelemetryAttributes.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index b05731080e8cc..658ea268c644c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -104,7 +104,6 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"); super.internalSetup(); - BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); storage.start(); Map checkMap = new HashMap<>(); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 07c8fc74661f2..4f898b382e633 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -26,8 +26,7 @@ */ public interface OpenTelemetryAttributes { /** - * The name of the Pulsar cluster. This attribute is automatically added to - * all signals by + * The name of the Pulsar cluster. This attribute is automatically added to all signals by * {@link OpenTelemetryService}. */ AttributeKey PULSAR_CLUSTER = AttributeKey.stringKey("pulsar.cluster");