From 2f5f5ce1623f374f17d67d8c151e8d13d7684078 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 5 Dec 2023 10:21:51 +0800 Subject: [PATCH] fix(s3stream): fix timeout detect (#793) Signed-off-by: Robin Han --- pom.xml | 2 +- s3stream/pom.xml | 2 +- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 4 ++-- .../java/com/automq/stream/s3/operator/DefaultS3Operator.java | 3 +-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 4d553b2e5..58d7886b4 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.1.3-jre 2.0.9 2.2 - 0.6.7-SNAPSHOT + 0.6.8-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index a15fbd149..1a21eccc4 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.6.7-SNAPSHOT + 0.6.8-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 88da704f6..3cde75643 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -360,7 +360,7 @@ private CompletableFuture read0(long streamId, long startOffset, if (!logCacheRecords.isEmpty()) { endOffset = logCacheRecords.get(0).getBaseOffset(); } - Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.error("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES); + Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES); return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> { List rst = new ArrayList<>(readDataBlock.getRecords()); int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum(); @@ -378,11 +378,11 @@ private CompletableFuture read0(long streamId, long startOffset, continuousCheck(rst); return new ReadDataBlock(rst, readDataBlock.getCacheAccessType()); }).whenComplete((rst, ex) -> { + timeout.cancel(); if (ex != null) { LOGGER.error("read from block cache failed, stream={}, {}-{}, maxBytes: {}", streamId, startOffset, maxBytes, ex); logCacheRecords.forEach(StreamRecordBatch::release); - timeout.cancel(); } }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 192f4013a..1f4e80e1f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -187,8 +187,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T } Timeout timeout = timeoutDetect.newTimeout((t) -> LOGGER.warn("rangeRead {} {}-{} timeout", path, start, end), 1, TimeUnit.MINUTES); - cf.whenComplete((rst, ex) -> timeout.cancel()); - return cf; + return cf.whenComplete((rst, ex) -> timeout.cancel()); } private void rangeRead0(String path, long start, long end, CompletableFuture cf) {