Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: lookup topic/queue owner node id #437

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,13 @@ CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObjects(lon

ConcurrentMap<Integer, BrokerNode> allNodes();

/**
* Query owner node id of the given topic/queue.
*
* @param topicId Topic ID
* @param queueId Queue ID
* @return Owner node id if found in cache; 0 otherise.
*/
int ownerNode(long topicId, int queueId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ public ConcurrentMap<Integer, BrokerNode> allNodes() {
return nodes;
}

@Override
public int ownerNode(long topicId, int queueId) {
return topicManager.ownerNode(topicId, queueId);
}

@Override
public CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer srcNodeId, Integer dstNodeId,
AssignmentStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,22 @@ long createStream(StreamMapper streamMapper, long topicId, int queueId, Long gro
private void notifyOnResourceChange(Set<Integer> nodes) {

}

public int ownerNode(long topicId, int queueId) {
Map<Integer, QueueAssignment> map = assignmentCache.byTopicId(topicId);
if (null != map) {
QueueAssignment assignment = map.get(queueId);
if (null != assignment) {
switch (assignment.getStatus()) {
case ASSIGNMENT_STATUS_ASSIGNED -> {
return assignment.getDstNodeId();
}
case ASSIGNMENT_STATUS_YIELDING -> {
return assignment.getSrcNodeId();
}
}
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,9 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
long endOffset, int limit) {
return metadataStore.listObjects(streamId, startOffset, endOffset, limit);
}

@Override
public int ownerNode(long topicId, int queueId) {
return metadataStore.ownerNode(topicId, queueId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,6 @@ CompletableFuture<Void> commitWalObject(S3WALObject walObject, List<S3StreamObje
*/
CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObjects(long streamId, long startOffset,
long endOffset, int limit);

int ownerNode(long topicId, int queueId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,9 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
long endOffset, int limit) {
return null;
}

@Override
public int ownerNode(long topicId, int queueId) {
return 0;
}
}