Skip to content

Commit

Permalink
feat(s3stream): limit s3 operator inflight request (#500)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx authored Oct 29, 2023
1 parent 748daa6 commit 54225fe
Showing 1 changed file with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -80,6 +81,9 @@ public class DefaultS3Operator implements S3Operator {
private final String bucket;
private final S3AsyncClient writeS3Client;
private final S3AsyncClient readS3Client;
private final Semaphore inflightWriteLimiter;
private final Semaphore inflightReadLimiter;

private final List<ReadTask> waitingReadTasks = new LinkedList<>();
private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter;
private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter;
Expand All @@ -100,6 +104,8 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client;
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter;
this.bucket = bucket;
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
checkConfig();
Expand All @@ -123,6 +129,8 @@ public static Builder builder() {
this.bucket = bucket;
this.networkInboundBandwidthLimiter = null;
this.networkOutboundBandwidthLimiter = null;
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = new Semaphore(50);
if (!manualMergeRead) {
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -221,7 +229,7 @@ private int availableReadPermit() {

CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
end = end - 1;
CompletableFuture<ByteBuf> cf = new CompletableFuture<>();
CompletableFuture<ByteBuf> cf = acquireReadPermit(new CompletableFuture<>());
mergedRangeRead0(path, start, end, cf);
return cf;
}
Expand Down Expand Up @@ -252,7 +260,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB

@Override
public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy throttleStrategy) {
CompletableFuture<Void> cf = new CompletableFuture<>();
CompletableFuture<Void> cf = acquireWritePermit(new CompletableFuture<>());
if (networkOutboundBandwidthLimiter != null) {
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -342,7 +350,7 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {

@Override
public CompletableFuture<String> createMultipartUpload(String path) {
CompletableFuture<String> cf = new CompletableFuture<>();
CompletableFuture<String> cf = acquireWritePermit(new CompletableFuture<>());
createMultipartUpload0(path, cf);
return cf;
}
Expand Down Expand Up @@ -370,7 +378,7 @@ void createMultipartUpload0(String path, CompletableFuture<String> cf) {

@Override
public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId, int partNumber, ByteBuf data, ThrottleStrategy throttleStrategy) {
CompletableFuture<CompletedPart> cf = new CompletableFuture<>();
CompletableFuture<CompletedPart> cf = acquireWritePermit(new CompletableFuture<>());
if (networkOutboundBandwidthLimiter != null) {
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -413,7 +421,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p

@Override
public CompletableFuture<CompletedPart> uploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber) {
CompletableFuture<CompletedPart> cf = new CompletableFuture<>();
CompletableFuture<CompletedPart> cf = acquireWritePermit(new CompletableFuture<>());
uploadPartCopy0(sourcePath, path, start, end, uploadId, partNumber, cf);
return cf;
}
Expand Down Expand Up @@ -445,7 +453,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en

@Override
public CompletableFuture<Void> completeMultipartUpload(String path, String uploadId, List<CompletedPart> parts) {
CompletableFuture<Void> cf = new CompletableFuture<>();
CompletableFuture<Void> cf = acquireWritePermit(new CompletableFuture<>());
completeMultipartUpload0(path, uploadId, parts, cf);
return cf;
}
Expand Down Expand Up @@ -552,6 +560,27 @@ private static S3AsyncClient newS3Client(String endpoint, String region, boolean
return builder.build();
}

<T> CompletableFuture<T> acquireReadPermit(CompletableFuture<T> cf) {
// TODO: async acquire?
try {
inflightReadLimiter.acquire();
return cf.whenComplete((rst, ex) -> inflightReadLimiter.release());
} catch (InterruptedException e) {
cf.completeExceptionally(e);
return cf;
}
}

<T> CompletableFuture<T> acquireWritePermit(CompletableFuture<T> cf) {
try {
inflightWriteLimiter.acquire();
return cf.whenComplete((rst, ex) -> inflightWriteLimiter.release());
} catch (InterruptedException e) {
cf.completeExceptionally(e);
return cf;
}
}

static class MergedReadTask {
static final int MAX_MERGE_READ_SIZE = 16 * 1024 * 1024;
final String path;
Expand Down

0 comments on commit 54225fe

Please # to comment.