Skip to content

Commit

Permalink
fix(s3stream): fix issues402 memory leak (#403)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx authored Oct 20, 2023
1 parent 6a0cea8 commit 47cf183
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 11 deletions.
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.12-SNAPSHOT</version>
<version>0.1.13-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,13 @@ private void ensureCapacity(int size) {
continue;
}
CacheBlock cacheBlock = streamCache.blocks.remove(entry.getKey().startOffset);
cacheBlock.free();
if (maxSize - this.size.addAndGet(-cacheBlock.size) >= size) {
return;
if (cacheBlock == null) {
LOGGER.error("[BUG] Cannot find stream cache block: {} {}", entry.getKey().streamId, entry.getKey().startOffset);
} else {
cacheBlock.free();
if (maxSize - this.size.addAndGet(-cacheBlock.size) >= size) {
return;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package com.automq.stream.s3.cache;

import com.automq.stream.s3.ObjectReader;
import com.automq.stream.utils.FutureUtil;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -30,6 +33,7 @@
* Accumulate inflight data block read requests to one real read request.
*/
public class DataBlockReadAccumulator {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReadAccumulator.class);
private final Map<Pair<String, Integer>, DataBlockRecords> inflightDataBlockReads = new ConcurrentHashMap<>();
private final Consumer<DataBlockRecords> dataBlockConsumer;

Expand Down Expand Up @@ -60,7 +64,10 @@ public CompletableFuture<DataBlockRecords> readDataBlock(ObjectReader reader, Ob
inflightDataBlockReads.remove(key, finalRecords);
}
finalRecords.complete(dataBlock, ex);
dataBlockConsumer.accept(finalRecords);
FutureUtil.suppress(() -> dataBlockConsumer.accept(finalRecords), LOGGER);
} catch (Throwable e) {
LOGGER.error("[UNEXPECTED] DataBlockRecords fail to notify listener {}", listener, e);
} finally {
finalRecords.release();
}
});
Expand Down
12 changes: 6 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LRUCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ public LRUCache() {
cacheEntrySet = cache.entrySet();
}

public boolean touch(K key) {
public synchronized boolean touch(K key) {
return cache.get(key) != null;
}

public void put(K key, V value) {
public synchronized void put(K key, V value) {
if (cache.put(key, value) != null) {
touch(key);
}
}

public V get(K key) {
public synchronized V get(K key) {
return cache.get(key);
}

public Map.Entry<K, V> pop() {
public synchronized Map.Entry<K, V> pop() {
Iterator<Map.Entry<K, V>> it = cacheEntrySet.iterator();
if (!it.hasNext()) {
return null;
Expand All @@ -58,11 +58,11 @@ public Map.Entry<K, V> pop() {
return entry;
}

public boolean remove(K key) {
public synchronized boolean remove(K key) {
return cache.remove(key) != null;
}

public int size() {
public synchronized int size() {
return cache.size();
}
}

0 comments on commit 47cf183

Please # to comment.