Skip to content

Commit

Permalink
fix(kafka_issues475): do not log when fast fail (#751)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx authored Nov 28, 2023
1 parent f5224c1 commit 701892f
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.6.2-SNAPSHOT</s3stream.version>
<s3stream.version>0.6.3-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.6.2-SNAPSHOT</version>
<version>0.6.3-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
*/
public class FastReadFailFastException extends StreamClientException {
public FastReadFailFastException() {
super(ErrorCode.FAST_READ_FAIL_FAST, "");
super(ErrorCode.FAST_READ_FAIL_FAST, "", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public StreamClientException(int code, String str, Throwable e) {
this.code = code;
}

public StreamClientException(int code, String str, boolean writableStackTrace) {
super("code: " + code + ", " + str, null, false, writableStackTrace);
this.code = code;
}

public int getCode() {
return this.code;
}
Expand Down
6 changes: 5 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;
import com.automq.stream.api.Stream;
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.metrics.TimerUtil;
Expand Down Expand Up @@ -190,7 +191,10 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
cf.whenComplete((rs, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
}
} else if (networkOutboundLimiter != null) {
long totalSize = rs.recordBatchList().stream().mapToLong(record -> record.rawPayload().remaining()).sum();
networkOutboundLimiter.forceConsume(totalSize);
Expand Down

0 comments on commit 701892f

Please # to comment.