Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(store): add s3 stream compaction config #565

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ public class S3StreamConfig {
private long networkBaselineBandwidth = 0;
private int refillPeriodMs = 1000;

private int streamObjectCompactionIntervalMinutes = 60;
private long streamObjectCompactionMaxSizeBytes = 10737418240L;
private int streamObjectCompactionLivingTimeMinutes = 60;

private int walObjectCompactionInterval = 20;
private long walObjectCompactionCacheSize = 200 * 1024 * 1024;
private int walObjectCompactionUploadConcurrency = 8;
private long walObjectCompactionStreamSplitSize = 16 * 1024 * 1024;
private int walObjectCompactionForceSplitPeriod = 120;
private int walObjectCompactionMaxObjectNum = 500;

public String s3Endpoint() {
return s3Endpoint;
}
Expand Down Expand Up @@ -65,4 +76,40 @@ public long networkBaselineBandwidth() {
public int refillPeriodMs() {
return refillPeriodMs;
}

public int streamObjectCompactionIntervalMinutes() {
return streamObjectCompactionIntervalMinutes;
}

public long streamObjectCompactionMaxSizeBytes() {
return streamObjectCompactionMaxSizeBytes;
}

public int streamObjectCompactionLivingTimeMinutes() {
return streamObjectCompactionLivingTimeMinutes;
}

public int walObjectCompactionInterval() {
return walObjectCompactionInterval;
}

public long walObjectCompactionCacheSize() {
return walObjectCompactionCacheSize;
}

public int walObjectCompactionUploadConcurrency() {
return walObjectCompactionUploadConcurrency;
}

public long walObjectCompactionStreamSplitSize() {
return walObjectCompactionStreamSplitSize;
}

public int walObjectCompactionForceSplitPeriod() {
return walObjectCompactionForceSplitPeriod;
}

public int walObjectCompactionMaxObjectNum() {
return walObjectCompactionMaxObjectNum;
}
}
11 changes: 11 additions & 0 deletions store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ private Config configFrom(S3StreamConfig streamConfig) {
config.s3SecretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());

// Compaction config
config.s3StreamObjectCompactionIntervalMinutes(streamConfig.streamObjectCompactionIntervalMinutes());
config.s3StreamObjectCompactionMaxSizeBytes(streamConfig.streamObjectCompactionMaxSizeBytes());
config.s3StreamObjectCompactionLivingTimeMinutes(streamConfig.streamObjectCompactionLivingTimeMinutes());
config.s3WALObjectCompactionInterval(streamConfig.walObjectCompactionInterval());
config.s3WALObjectCompactionCacheSize(streamConfig.walObjectCompactionCacheSize());
config.s3WALObjectCompactionUploadConcurrency(streamConfig.walObjectCompactionUploadConcurrency());
config.s3WALObjectCompactionMaxObjectNum(streamConfig.walObjectCompactionMaxObjectNum());
config.s3WALObjectCompactionForceSplitPeriod(streamConfig.walObjectCompactionForceSplitPeriod());
config.s3WALObjectCompactionStreamSplitSize(streamConfig.walObjectCompactionStreamSplitSize());
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public static Operation decodeOperation(ByteBuffer buffer,
operationStreamId, snapshotStreamId, stateMachine,
resetConsumeOffsetOperation.operationTimestamp(), resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset());
}
default -> throw new IllegalStateException("Unexpected value: " + operationLogItem.operationType());
default ->
throw new IllegalStateException("Unexpected operation type: " + operationLogItem.operationType());
}
}

Expand Down