Skip to content

Commit

Permalink
fix(s3stream): checkstyle
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx committed Oct 26, 2023
1 parent c751dcd commit eed8773
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3;

import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
Expand All @@ -29,7 +30,8 @@
public class DirectByteBufAlloc {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
private static final List<OOMHandler> OOMHandlers = new ArrayList<>();
private static final List<OOMHandler> OOM_HANDLERS = new ArrayList<>();
private static long lastLogTimestamp = 0L;

public static CompositeByteBuf compositeByteBuffer() {
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
Expand All @@ -41,7 +43,7 @@ public static ByteBuf byteBuffer(int initCapacity) {
} catch (OutOfMemoryError e) {
for (;;) {
int freedBytes = 0;
for (OOMHandler handler : OOMHandlers) {
for (OOMHandler handler : OOM_HANDLERS) {
freedBytes += handler.handle(initCapacity);
try {
ByteBuf buf = ALLOC.directBuffer(initCapacity);
Expand All @@ -51,16 +53,17 @@ public static ByteBuf byteBuffer(int initCapacity) {
// ignore
}
}
if (freedBytes == 0) {
break;
if (System.currentTimeMillis() - lastLogTimestamp >= 1000L) {
LOGGER.error("try recover from OOM fail, freedBytes={}, retry later", freedBytes);
lastLogTimestamp = System.currentTimeMillis();
}
Threads.sleep(1L);
}
throw e;
}
}

public static void registerOOMHandlers(OOMHandler handler) {
OOMHandlers.add(handler);
OOM_HANDLERS.add(handler);
}

public interface OOMHandler {
Expand Down
11 changes: 9 additions & 2 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class S3Storage implements Storage {

private final Queue<BackoffRecord> backoffRecords = new LinkedBlockingQueue<>();
private final ScheduledFuture<?> drainBackoffTask;
private long lastLogTimestamp = 0L;

private final StreamManager streamManager;
private final ObjectManager objectManager;
Expand Down Expand Up @@ -225,7 +226,10 @@ public void append0(StreamRecordBatch streamRecord, CompletableFuture<Void> cf)
if (!tryAcquirePermit()) {
backoffRecords.offer(new BackoffRecord(streamRecord, cf));
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc();
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
}
return;
}
WriteAheadLog.AppendResult appendResult;
Expand All @@ -237,7 +241,10 @@ public void append0(StreamRecordBatch streamRecord, CompletableFuture<Void> cf)
// the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen.
forceUpload(LogCache.MATCH_ALL_STREAMS);
backoffRecords.offer(new BackoffRecord(streamRecord, cf));
LOGGER.warn("[BACKOFF] log over capacity", e);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log over capacity", e);
lastLogTimestamp = System.currentTimeMillis();
}
return;
}
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.recordOffset(), cf);
Expand Down

0 comments on commit eed8773

Please # to comment.