Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

perf(s3stream/storage): remove the single main write thread #728

Merged
merged 3 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;


Expand All @@ -76,8 +79,6 @@ public class S3Storage implements Storage {
private final Queue<DeltaWALUploadTaskContext> walCommitQueue = new LinkedList<>();
private final List<CompletableFuture<Void>> inflightWALUploadTasks = new CopyOnWriteArrayList<>();

private final ExecutorService mainWriteExecutor = Threads.newFixedThreadPoolWithMonitor(1,
"s3-storage-main-write", false, LOGGER);
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor(
Expand All @@ -91,6 +92,12 @@ public class S3Storage implements Storage {
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final S3BlockCache blockCache;
/**
* Stream callback locks. Used to ensure the stream callbacks will not be called concurrently.
*
* @see #handleAppendCallback
*/
private final Map<Long, Lock> streamCallbackLocks = new ConcurrentHashMap<>();

public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager objectManager,
S3BlockCache blockCache, S3Operator s3Operator) {
Expand Down Expand Up @@ -223,7 +230,6 @@ public void shutdown() {
}
deltaWAL.shutdownGracefully();
backgroundExecutor.shutdown();
mainWriteExecutor.shutdown();
}


Expand Down Expand Up @@ -372,31 +378,40 @@ public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> {
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId));
callbackSequencer.tryFree(streamId);
});
return cf;
}

private void handleAppendRequest(WalWriteRequest request) {
mainWriteExecutor.execute(() -> callbackSequencer.before(request));
callbackSequencer.before(request);
}

private void handleAppendCallback(WalWriteRequest request) {
mainWriteExecutor.execute(() -> handleAppendCallback0(request));
}

private void handleAppendCallback0(WalWriteRequest request) {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
TimerUtil timer = new TimerUtil();
List<WalWriteRequest> waitingAckRequests;
Lock lock = getStreamCallbackLock(request.record.getStreamId());
lock.lock();
try {
waitingAckRequests = callbackSequencer.after(request);
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
}
}
} finally {
lock.unlock();
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_APPEND_CALLBACK).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
}

private Lock getStreamCallbackLock(long streamId) {
return streamCallbackLocks.computeIfAbsent(streamId, id -> new ReentrantLock());
}

@SuppressWarnings("UnusedReturnValue")
Expand Down Expand Up @@ -508,21 +523,25 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) {
}

/**
* WALCallbackSequencer is modified in single thread mainExecutor.
* WALCallbackSequencer is used to sequence the unordered returned persistent data.
*/
static class WALCallbackSequencer {
public static final long NOOP_OFFSET = -1L;
private final Map<Long, Queue<WalWriteRequest>> stream2requests = new HashMap<>();
private final Map<Long, BlockingQueue<WalWriteRequest>> stream2requests = new ConcurrentHashMap<>();
private final BlockingQueue<WalWriteRequest> walRequests = new LinkedBlockingQueue<>();
private long walConfirmOffset = NOOP_OFFSET;

/**
* Add request to stream sequence queue.
* When the {@code request.record.getStreamId()} is different, concurrent calls are allowed.
* When the {@code request.record.getStreamId()} is the same, concurrent calls are not allowed. And it is
* necessary to ensure that calls are made in the order of increasing offsets.
*/
public void before(WalWriteRequest request) {
try {
walRequests.put(request);
Queue<WalWriteRequest> streamRequests = stream2requests.computeIfAbsent(request.record.getStreamId(), s -> new LinkedBlockingQueue<>());
assert streamRequests.isEmpty() || streamRequests.peek().offset < request.offset;
streamRequests.add(request);
} catch (Throwable ex) {
request.cf.completeExceptionally(ex);
Expand All @@ -531,12 +550,15 @@ public void before(WalWriteRequest request) {

/**
* Try pop sequence persisted request from stream queue and move forward wal inclusive confirm offset.
* When the {@code request.record.getStreamId()} is different, concurrent calls are allowed.
* When the {@code request.record.getStreamId()} is the same, concurrent calls are not allowed.
*
* @return popped sequence persisted request.
*/
public List<WalWriteRequest> after(WalWriteRequest request) {
request.persisted = true;
// move the WAL inclusive confirm offset.
// FIXME: requests in walRequests may not be in order.
for (; ; ) {
WalWriteRequest peek = walRequests.peek();
if (peek == null || !peek.persisted) {
Expand All @@ -548,19 +570,33 @@ public List<WalWriteRequest> after(WalWriteRequest request) {

// pop sequence success stream request.
long streamId = request.record.getStreamId();
Queue<WalWriteRequest> streamRequests = stream2requests.get(streamId);
BlockingQueue<WalWriteRequest> streamRequests = stream2requests.get(streamId);
WalWriteRequest peek = streamRequests.peek();
if (peek == null || peek.offset != request.offset) {
return Collections.emptyList();
}
List<WalWriteRequest> rst = new ArrayList<>();
rst.add(streamRequests.poll());
if (streamRequests.remove(request)) {
rst.add(request);
} else {
// Should not happen.
LOGGER.error("request was removed by other thread after it was persisted. streamId={}, offset={}",
streamId, request.offset);
assert false;
}
for (; ; ) {
peek = streamRequests.peek();
if (peek == null || !peek.persisted) {
break;
}
rst.add(streamRequests.poll());
if (streamRequests.remove(peek)) {
rst.add(peek);
} else {
// Should not happen.
LOGGER.error("request was removed by other thread after it was persisted. streamId={}, offset={}, peekOffset={}",
streamId, request.offset, peek.offset);
assert false;
}
}
return rst;
}
Expand Down
20 changes: 12 additions & 8 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
this(capacity, cacheBlockMaxSize, maxCacheBlockStreamCount, DEFAULT_BLOCK_FREE_LISTENER);
}

/**
* Put a record batch into the cache.
* record batched in the same stream should be put in order.
*/
public boolean put(StreamRecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
tryRealFree();
Expand Down Expand Up @@ -196,7 +200,7 @@ public Optional<LogCacheBlock> archiveCurrentBlockIfContains(long streamId) {

Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {
if (streamId == MATCH_ALL_STREAMS) {
if (activeBlock.size > 0) {
if (activeBlock.size() > 0) {
return Optional.of(archiveCurrentBlock());
} else {
return Optional.empty();
Expand Down Expand Up @@ -228,7 +232,7 @@ private void tryRealFree() {
return false;
}
if (b.free) {
size.addAndGet(-b.size);
size.addAndGet(-b.size());
removed.add(b);
}
return b.free;
Expand All @@ -251,8 +255,9 @@ public int forceFree(int required) {
if (!block.free || freedBytes.get() >= required) {
return false;
}
size.addAndGet(-block.size);
freedBytes.addAndGet((int) block.size);
long blockSize = block.size();
size.addAndGet(-blockSize);
freedBytes.addAndGet((int) blockSize);
removed.add(block);
return true;
});
Expand Down Expand Up @@ -280,7 +285,7 @@ public static class LogCacheBlock {
private final long maxSize;
private final int maxStreamCount;
private final Map<Long, List<StreamRecordBatch>> map = new ConcurrentHashMap<>();
private long size = 0;
private final AtomicLong size = new AtomicLong();
private long confirmOffset;
volatile boolean free;

Expand Down Expand Up @@ -310,8 +315,7 @@ public boolean put(StreamRecordBatch recordBatch) {
return records;
});
int recordSize = recordBatch.size();
size += recordSize;
return size >= maxSize || map.size() >= maxStreamCount;
return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount;
}

public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
Expand Down Expand Up @@ -371,7 +375,7 @@ public void confirmOffset(long confirmOffset) {
}

public long size() {
return size;
return size.get();
}

public void free() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum S3Operation {
/* S3 storage operations start */
APPEND_STORAGE(S3MetricsType.S3Storage, "append"),
APPEND_STORAGE_WAL(S3MetricsType.S3Storage, "append_wal"),
APPEND_STORAGE_APPEND_CALLBACK(S3MetricsType.S3Storage, "append_callback"),
APPEND_STORAGE_WAL_FULL(S3MetricsType.S3Storage, "append_wal_full"),
APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"),
APPEND_STORAGE_LOG_CACHE_FULL(S3MetricsType.S3Storage, "append_log_cache_full"),
Expand Down