Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[feat][broker] PIP-264: Add schema registry metrics #22624

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ public void start() throws PulsarServerException {

schemaStorage = createAndStartSchemaStorage();
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor);
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this);

OffloadPoliciesImpl defaultOffloadPolicies =
OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -44,13 +44,13 @@ static Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> checker
}

static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> schemaRegistryCompatibilityCheckers,
ScheduledExecutorService scheduler) {
PulsarService pulsarService) {
if (schemaStorage != null) {
try {
Map<SchemaType, SchemaCompatibilityCheck> checkers = getCheckers(schemaRegistryCompatibilityCheckers);
checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
return SchemaRegistryServiceWithSchemaDataValidator.of(
new SchemaRegistryServiceImpl(schemaStorage, checkers, scheduler));
new SchemaRegistryServiceImpl(schemaStorage, checkers, pulsarService));
} catch (Exception e) {
LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -47,6 +46,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
Expand All @@ -70,19 +70,19 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {

@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks, Clock clock,
ScheduledExecutorService scheduler) {
Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks,
Clock clock,
PulsarService pulsarService) {
this.schemaStorage = schemaStorage;
this.compatibilityChecks = compatibilityChecks;
this.clock = clock;
this.stats = SchemaRegistryStats.getInstance(scheduler);
this.stats = new SchemaRegistryStats(pulsarService);
}

@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks,
ScheduledExecutorService scheduler) {
this(schemaStorage, compatibilityChecks, Clock.systemUTC(), scheduler);
PulsarService pulsarService) {
this(schemaStorage, compatibilityChecks, Clock.systemUTC(), pulsarService);
}

@Override
Expand Down Expand Up @@ -136,16 +136,17 @@ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVer
}
})
.whenComplete((v, t) -> {
var latencyMs = this.clock.millis() - start;
if (t != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Get schema failed", schemaId);
}
this.stats.recordGetFailed(schemaId);
this.stats.recordGetFailed(schemaId, latencyMs);
} else {
if (log.isDebugEnabled()) {
log.debug(null == v ? "[{}] Schema not found" : "[{}] Schema is present", schemaId);
}
this.stats.recordGetLatency(schemaId, this.clock.millis() - start);
this.stats.recordGetLatency(schemaId, latencyMs);
}
});
}
Expand All @@ -157,10 +158,11 @@ public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchem
return schemaStorage.getAll(schemaId)
.thenCompose(schemas -> convertToSchemaAndMetadata(schemaId, schemas))
.whenComplete((v, t) -> {
var latencyMs = this.clock.millis() - start;
if (t != null) {
this.stats.recordGetFailed(schemaId);
this.stats.recordListFailed(schemaId, latencyMs);
} else {
this.stats.recordGetLatency(schemaId, this.clock.millis() - start);
this.stats.recordListLatency(schemaId, latencyMs);
}
});
}
Expand Down Expand Up @@ -228,10 +230,11 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
});
}))).whenComplete((v, ex) -> {
var latencyMs = this.clock.millis() - start.getValue();
if (ex != null) {
log.error("[{}] Put schema failed", schemaId, ex);
if (start.getValue() != 0) {
this.stats.recordPutFailed(schemaId);
this.stats.recordPutFailed(schemaId, latencyMs);
}
promise.completeExceptionally(ex);
} else {
Expand Down Expand Up @@ -261,14 +264,15 @@ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String use
return schemaStorage
.put(schemaId, deletedEntry, new byte[]{})
.whenComplete((v, t) -> {
var latencyMs = this.clock.millis() - start;
if (t != null) {
log.error("[{}] User {} delete schema failed", schemaId, user);
this.stats.recordDelFailed(schemaId);
this.stats.recordDelFailed(schemaId, latencyMs);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] User {} delete schema finished", schemaId, user);
}
this.stats.recordDelLatency(schemaId, this.clock.millis() - start);
this.stats.recordDelLatency(schemaId, latencyMs);
}
});
}
Expand All @@ -284,11 +288,12 @@ public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boo

return schemaStorage.delete(schemaId, forcefully)
.whenComplete((v, t) -> {
var latencyMs = this.clock.millis() - start;
if (t != null) {
this.stats.recordDelFailed(schemaId);
this.stats.recordDelFailed(schemaId, latencyMs);
log.error("[{}] Delete schema storage failed", schemaId);
} else {
this.stats.recordDelLatency(schemaId, this.clock.millis() - start);
this.stats.recordDelLatency(schemaId, latencyMs);
if (log.isDebugEnabled()) {
log.debug("[{}] Delete schema storage finished", schemaId);
}
Expand Down
Loading
Loading