diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index db22b3d67..f654df6ee 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -241,7 +241,9 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) { return true; } if (!tryAcquirePermit()) { - backoffRecords.offer(request); + if (!fromBackoff) { + backoffRecords.offer(request); + } OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc(); if (System.currentTimeMillis() - lastLogTimestamp > 1000L) { LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize); @@ -257,7 +259,9 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) { } catch (WriteAheadLog.OverCapacityException e) { // the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen. forceUpload(LogCache.MATCH_ALL_STREAMS); - backoffRecords.offer(request); + if (!fromBackoff) { + backoffRecords.offer(request); + } if (System.currentTimeMillis() - lastLogTimestamp > 1000L) { LOGGER.warn("[BACKOFF] log over capacity", e); lastLogTimestamp = System.currentTimeMillis(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java index 69ae60e6b..8c04c096b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java @@ -86,6 +86,7 @@ public static Map> blockWaitObjectIndices(List(f.getKey(), validStreamDataBlocks); } catch (Exception ex) { // continue compaction without invalid object + // TODO: log warn return null; } }) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 4ca135072..39522dd53 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -18,7 +18,6 @@ package com.automq.stream.s3.compact.operator; import com.automq.stream.s3.DirectByteBufAlloc; -import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.compact.objects.StreamDataBlock; import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.operator.S3Operator; @@ -36,7 +35,7 @@ //TODO: refactor to reduce duplicate code with ObjectWriter public class DataBlockReader { - private static final Logger LOGGER = LoggerFactory.getLogger(ObjectReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class); private final S3ObjectMetadata metadata; private final String objectKey; private final S3Operator s3Operator; @@ -66,7 +65,7 @@ public void parseDataBlockIndex(long startPosition) { } }).exceptionally(ex -> { // unrecoverable error, possibly read on a deleted object - LOGGER.warn("s3 range read from {} [{}, {}) failed, ex", objectKey, startPosition, metadata.objectSize(), ex); + LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.objectSize(), ex); indexBlockCf.completeExceptionally(ex); return null; }); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index b871b7fce..f126d7977 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -243,10 +243,10 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture mergedRangeRead0(path, start, end, cf), 100, TimeUnit.MILLISECONDS); } return null;