Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(s3stream): simplify operation counter metrics #553

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.20-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.21-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.20-SNAPSHOT</version>
<version>0.1.21-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
11 changes: 4 additions & 7 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
append0(writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE).update(timerUtil.elapsed());
});
return cf;
}
Expand All @@ -247,7 +246,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).inc();
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -305,8 +304,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsed());
});
return cf;
}
Expand Down Expand Up @@ -409,8 +407,7 @@ CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadWALObject0(logCacheBlock, cf), cf, LOGGER, "uploadWALObject"));
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_STORAGE_WAL).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_STORAGE_WAL).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.UPLOAD_STORAGE_WAL).update(timerUtil.elapsed());
inflightWALUploadTasks.remove(cf);
if (ex != null) {
LOGGER.error("upload WAL object fail", ex);
Expand Down
9 changes: 3 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationTime.update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(System.currentTimeMillis() - start);
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -183,8 +182,7 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationTime.update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(System.currentTimeMillis() - start);
if (ex != null) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
} else if (networkOutboundLimiter != null) {
Expand Down Expand Up @@ -229,8 +227,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.TRIM_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.TRIM_STREAM).operationTime.update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(System.currentTimeMillis() - start);
});
return cf;
}, LOGGER, "trim");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_STREAM).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsed());
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
Expand Down Expand Up @@ -140,8 +139,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsed());
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.s3StreamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeMinutes() * 60L * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
}

if (ret.isCacheHit()) {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
} else {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
}
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE).operationTime.update(timerUtil.elapsedAndReset());
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAndReset());
});
return readCf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public boolean put(StreamRecordBatch recordBatch) {
tryRealFree();
size.addAndGet(recordBatch.size());
boolean full = activeBlock.put(recordBatch);
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_LOG_CACHE).update(timerUtil.elapsed());
return full;
}

Expand Down Expand Up @@ -104,11 +103,11 @@ public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffs
List<StreamRecordBatch> records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc();
} else {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE_MISS).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE_MISS).inc();
}
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_LOG_CACHE).update(timerUtil.elapsed());
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,30 @@
import java.util.concurrent.ConcurrentHashMap;

public class OperationMetricsStats {
private static final Map<String, OperationMetrics> OPERATION_METRICS_MAP = new ConcurrentHashMap<>();
private static final Map<String, Counter> OPERATION_COUNTER_MAP = new ConcurrentHashMap<>();
private static final Map<String, Histogram> OPERATION_HIST_MAP = new ConcurrentHashMap<>();

public static OperationMetrics getOrCreateOperationMetrics(S3Operation s3Operation) {
return OPERATION_METRICS_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id ->
new OperationMetrics(s3Operation));
public static Counter getCounter(S3Operation s3Operation) {
return getOrCreateCounterMetrics(s3Operation);
}

public static class OperationMetrics {
public final Counter operationCount;
public final Histogram operationTime;
public static Histogram getHistogram(S3Operation s3Operation) {
return getOrCreateHistMetrics(s3Operation);
}

private static Counter getOrCreateCounterMetrics(S3Operation s3Operation) {
return OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("operation_count" + Counter.SUFFIX, tags(s3Operation)));
}

public OperationMetrics(S3Operation s3Operation) {
Map<String, String> tags = Map.of(
"operation", s3Operation.getName(),
"op_type", s3Operation.getType().getName());
operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags);
operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags);
}
private static Histogram getOrCreateHistMetrics(S3Operation s3Operation) {
return OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
.newHistogram("operation_time", tags(s3Operation)));
}

private static Map<String, String> tags(S3Operation s3Operation) {
return Map.of(
"operation", s3Operation.getName(),
"op_type", s3Operation.getType().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class S3ObjectMetricsStats {
public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());

public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) {
public static Histogram getHistogram(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Map<String, String> tags = Map.of("stage", stage.getName());
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
Expand Down
Loading