Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: localize s3 metadata access #543

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.automq.rocketmq.metadata.DefaultStoreMetadataService;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.metadata.api.StoreMetadataService;
import com.automq.rocketmq.metadata.s3.DefaultS3MetadataService;
import com.automq.rocketmq.metadata.api.S3MetadataService;
import com.automq.rocketmq.proxy.config.ProxyConfiguration;
import com.automq.rocketmq.proxy.grpc.GrpcProtocolServer;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
Expand Down Expand Up @@ -64,7 +66,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
metadataStore = MetadataStoreBuilder.build(brokerConfig);

proxyMetadataService = new DefaultProxyMetadataService(metadataStore);
storeMetadataService = new DefaultStoreMetadataService(metadataStore);
S3MetadataService s3MetadataService = new DefaultS3MetadataService(metadataStore.config(),
metadataStore.sessionFactory(), metadataStore.asyncExecutor());
storeMetadataService = new DefaultStoreMetadataService(metadataStore, s3MetadataService);

dlqService = new DeadLetterService(brokerConfig, proxyMetadataService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ default int workloadTolerance() {
return 1;
}

default boolean circuitStreamMetadata() {
return true;
}

String dbUrl();

String dbUserName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import apache.rocketmq.controller.v1.CloseStreamReply;
import apache.rocketmq.controller.v1.CloseStreamRequest;
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.CommitStreamObjectReply;
import apache.rocketmq.controller.v1.CommitStreamObjectRequest;
import apache.rocketmq.controller.v1.CommitWALObjectReply;
import apache.rocketmq.controller.v1.CommitWALObjectRequest;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
Expand All @@ -36,13 +32,9 @@
import apache.rocketmq.controller.v1.ListTopicsRequest;
import apache.rocketmq.controller.v1.OpenStreamReply;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.PrepareS3ObjectsReply;
import apache.rocketmq.controller.v1.PrepareS3ObjectsRequest;
import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
Expand Down Expand Up @@ -91,14 +83,6 @@ public interface ControllerClient extends Closeable {

CompletableFuture<ListOpenStreamsReply> listOpenStreams(String target, ListOpenStreamsRequest request);

CompletableFuture<TrimStreamReply> trimStream(String target, TrimStreamRequest request);

CompletableFuture<PrepareS3ObjectsReply> prepareS3Objects(String target, PrepareS3ObjectsRequest request);

CompletableFuture<CommitStreamObjectReply> commitStreamObject(String target, CommitStreamObjectRequest request);

CompletableFuture<CommitWALObjectReply> commitWALObject(String target, CommitWALObjectRequest request);

CompletableFuture<Topic> updateTopic(String target, UpdateTopicRequest request);

void terminateNode(String target, TerminateNodeRequest request, StreamObserver<TerminateNodeReply> observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import apache.rocketmq.controller.v1.S3StreamObject;
import apache.rocketmq.controller.v1.S3WALObject;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.TerminationStage;
Expand All @@ -48,8 +46,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;

public interface MetadataStore extends Closeable {

Expand All @@ -67,6 +65,8 @@ public interface MetadataStore extends Closeable {
*/
SqlSession openSession();

SqlSessionFactory sessionFactory();

ControllerClient controllerClient();

void addBrokerNode(Node node);
Expand Down Expand Up @@ -177,35 +177,17 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s
*/
CompletableFuture<Void> onQueueClosed(long topicId, int queueId);

CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset);

CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch, int nodeId);

CompletableFuture<Void> closeStream(long streamId, long streamEpoch, int nodeId);

CompletableFuture<List<StreamMetadata>> listOpenStreams(int nodeId);

CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes);

CompletableFuture<Void> commitWalObject(S3WALObject walObject, List<S3StreamObject> streamObjects,
List<Long> compactedObjects);

CompletableFuture<Void> commitStreamObject(S3StreamObject streamObject,
List<Long> compactedObjects) throws ControllerException;

CompletableFuture<List<S3WALObject>> listWALObjects();

CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long startOffset, long endOffset, int limit);

CompletableFuture<List<S3StreamObject>> listStreamObjects(long streamId, long startOffset, long endOffset,
int limit);

CompletableFuture<Long> getConsumerOffset(long consumerGroupId, long topicId, int queueId);

CompletableFuture<String> addressOfNode(int nodeId);

CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObjects(long streamId, long startOffset,
long endOffset, int limit);

boolean maintainLeadershipWithSharedLock(SqlSession session);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.CommitOffsetReply;
import apache.rocketmq.controller.v1.CommitOffsetRequest;
import apache.rocketmq.controller.v1.CommitStreamObjectReply;
import apache.rocketmq.controller.v1.CommitStreamObjectRequest;
import apache.rocketmq.controller.v1.CommitWALObjectReply;
import apache.rocketmq.controller.v1.CommitWALObjectRequest;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
Expand Down Expand Up @@ -58,15 +54,11 @@
import apache.rocketmq.controller.v1.NotifyMessageQueuesAssignableRequest;
import apache.rocketmq.controller.v1.OpenStreamReply;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.PrepareS3ObjectsReply;
import apache.rocketmq.controller.v1.PrepareS3ObjectsRequest;
import apache.rocketmq.controller.v1.ReassignMessageQueueReply;
import apache.rocketmq.controller.v1.ReassignMessageQueueRequest;
import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.UpdateGroupReply;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicReply;
Expand Down Expand Up @@ -744,104 +736,6 @@ public void onFailure(@Nonnull Throwable t) {
return future;
}

@Override
public CompletableFuture<TrimStreamReply> trimStream(String target,
TrimStreamRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<TrimStreamReply> future = new CompletableFuture<>();
Futures.addCallback(stub.trimStream(request), new FutureCallback<>() {
@Override
public void onSuccess(TrimStreamReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<PrepareS3ObjectsReply> prepareS3Objects(String target,
PrepareS3ObjectsRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<PrepareS3ObjectsReply> future = new CompletableFuture<>();
Futures.addCallback(stub.prepareS3Objects(request), new FutureCallback<>() {
@Override
public void onSuccess(PrepareS3ObjectsReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<CommitStreamObjectReply> commitStreamObject(String target,
CommitStreamObjectRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<CommitStreamObjectReply> future = new CompletableFuture<>();
Futures.addCallback(stub.commitStreamObject(request), new FutureCallback<>() {
@Override
public void onSuccess(CommitStreamObjectReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<CommitWALObjectReply> commitWALObject(String target, CommitWALObjectRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}
CompletableFuture<CommitWALObjectReply> future = new CompletableFuture<>();
Futures.addCallback(stub.commitWALObject(request), new FutureCallback<>() {
@Override
public void onSuccess(CommitWALObjectReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ControllerServiceGrpc.ControllerServiceFutureStub> entry : stubs.entrySet()) {
Expand Down
Loading