Skip to content

Commit

Permalink
[Refactor] remove unnecessary get shard info api
Browse files Browse the repository at this point in the history
Signed-off-by: starrocks-xupeng <xupeng@starrocks.com>
  • Loading branch information
starrocks-xupeng committed Mar 6, 2025
1 parent 1378562 commit 1697eff
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 90 deletions.
11 changes: 9 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/lake/LakeTablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,13 @@ public Set<Long> getBackendIds(long warehouseId) {
try {
List<Long> ids = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getAllComputeNodeIdsAssignToTablet(warehouseId, this);
return new HashSet<Long>(ids);
if (ids == null) {
return Sets.newHashSet();
} else {
return new HashSet<Long>(ids);
}
} catch (Exception e) {
LOG.warn("Failed to get backends by shard. tablet id: {}", getId(), e);
LOG.warn("Failed to get backends by shard id: {}", getId(), e);
return Sets.newHashSet();
}
}
Expand All @@ -153,6 +157,9 @@ public void getQueryableReplicas(List<Replica> allQuerableReplicas, List<Replica
long visibleVersion, long localBeId, int schemaHash, long warehouseId) {
List<Long> computeNodeIds = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getAllComputeNodeIdsAssignToTablet(warehouseId, this);
if (computeNodeIds == null) {
return;
}
for (long backendId : computeNodeIds) {
Replica replica = new Replica(getId(), backendId, visibleVersion, schemaHash, getDataSize(true),
getRowCount(visibleVersion), NORMAL, -1, visibleVersion);
Expand Down
31 changes: 19 additions & 12 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -614,12 +614,8 @@ private Optional<Long> getOrUpdateNodeIdByWorkerInfo(WorkerInfo info) {
return result;
}

public long getPrimaryComputeNodeIdByShard(long shardId) throws StarRocksException {
return getPrimaryComputeNodeIdByShard(shardId, DEFAULT_WORKER_GROUP_ID);
}

public long getPrimaryComputeNodeIdByShard(long shardId, long workerGroupId) throws StarRocksException {
List<Long> backendIds = getAllNodeIdsByShard(shardId, workerGroupId, true);
List<Long> backendIds = getAllNodeIdsByShard(shardId, workerGroupId);
if (backendIds.isEmpty()) {
// If BE stops, routine load task may catch UserException during load plan,
// and the job state will changed to PAUSED.
Expand All @@ -631,22 +627,33 @@ public long getPrimaryComputeNodeIdByShard(long shardId, long workerGroupId) thr
return backendIds.iterator().next();
}

public List<Long> getAllNodeIdsByShard(long shardId, long workerGroupId, boolean onlyPrimary)
public long getPrimaryComputeNodeIdByShard(ShardInfo shardInfo) throws StarRocksException {
List<Long> ids = getAllNodeIdsByShard(shardInfo);
if (ids.isEmpty()) {
// If BE stops, routine load task may catch UserException during load plan,
// and the job state will changed to PAUSED.
// The job will automatically recover from PAUSED to RUNNING if the error code is REPLICA_FEW_ERR
// when all BEs become alive.
throw new StarRocksException(InternalErrorCode.REPLICA_FEW_ERR,
"Failed to get primary backend. shard id: " + shardInfo.getShardId());
}
return ids.iterator().next();
}

public List<Long> getAllNodeIdsByShard(long shardId, long workerGroupId)
throws StarRocksException {
try {
ShardInfo shardInfo = getShardInfo(shardId, workerGroupId);
return getAllNodeIdsByShard(shardInfo, onlyPrimary);
return getAllNodeIdsByShard(shardInfo);
} catch (StarClientException e) {
throw new StarRocksException(e);
}
}

public List<Long> getAllNodeIdsByShard(ShardInfo shardInfo, boolean onlyPrimary) {
private List<Long> getAllNodeIdsByShard(ShardInfo shardInfo) {
List<ReplicaInfo> replicas = shardInfo.getReplicaInfoList();
if (onlyPrimary) {
replicas = replicas.stream().filter(x -> x.getReplicaRole() == ReplicaRole.PRIMARY)
.collect(Collectors.toList());
}
replicas = replicas.stream().filter(x -> x.getReplicaRole() == ReplicaRole.PRIMARY)
.collect(Collectors.toList());
List<Long> nodeIds = new ArrayList<>();
replicas.stream()
.map(x -> getOrUpdateNodeIdByWorkerInfo(x.getWorkerInfo()))
Expand Down
8 changes: 5 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/lake/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ private Utils() {
}

public static Long chooseNodeId(ShardInfo shardInfo) {
List<Long> ids = GlobalStateMgr.getCurrentState().getStarOSAgent().getAllNodeIdsByShard(shardInfo, true);
if (!ids.isEmpty()) {
return ids.iterator().next();
try {
Long id = GlobalStateMgr.getCurrentState().getStarOSAgent().getPrimaryComputeNodeIdByShard(shardInfo);
return id;
} catch (StarRocksException e) {
// do nothing
}
try {
return GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void prepareAndSendSnapshotTasks(Database db) {
MaterializedIndex index = part.getDefaultPhysicalPartition().getIndex(idChain.getIdxId());
tablet = (LakeTablet) index.getTablet(idChain.getTabletId());
Long computeNodeId = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getComputeNodeId(WarehouseManager.DEFAULT_WAREHOUSE_NAME, tablet);
.getComputeNodeId(WarehouseManager.DEFAULT_WAREHOUSE_ID, tablet);

LakeTableSnapshotInfo info = new LakeTableSnapshotInfo(db.getId(), idChain.getTblId(),
idChain.getPartId(), idChain.getIdxId(), idChain.getTabletId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.staros.client.StarClientException;
import com.staros.proto.ShardInfo;
import com.staros.util.LockCloseable;
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
Expand Down Expand Up @@ -179,54 +177,20 @@ public Long getComputeNodeId(Long warehouseId, LakeTablet tablet) {
try {
long workerGroupId = selectWorkerGroupInternal(warehouseId)
.orElse(StarOSAgent.DEFAULT_WORKER_GROUP_ID);
ShardInfo shardInfo = GlobalStateMgr.getCurrentState().getStarOSAgent()
.getShardInfo(tablet.getShardId(), workerGroupId);

Long nodeId;
List<Long> ids = GlobalStateMgr.getCurrentState().getStarOSAgent()
.getAllNodeIdsByShard(shardInfo, true);
if (!ids.isEmpty()) {
nodeId = ids.iterator().next();
return nodeId;
} else {
return null;
}
} catch (StarClientException e) {
return null;
}
}

public Long getComputeNodeId(String warehouseName, LakeTablet tablet) {
Warehouse warehouse = getWarehouse(warehouseName);

try {
long workerGroupId = selectWorkerGroupInternal(warehouse.getId()).orElse(StarOSAgent.DEFAULT_WORKER_GROUP_ID);
ShardInfo shardInfo = GlobalStateMgr.getCurrentState().getStarOSAgent()
.getShardInfo(tablet.getShardId(), workerGroupId);

Long nodeId;
List<Long> ids = GlobalStateMgr.getCurrentState().getStarOSAgent()
.getAllNodeIdsByShard(shardInfo, true);
if (!ids.isEmpty()) {
nodeId = ids.iterator().next();
return nodeId;
} else {
return null;
}
} catch (StarClientException e) {
long nodeId = GlobalStateMgr.getCurrentState().getStarOSAgent()
.getPrimaryComputeNodeIdByShard(tablet.getShardId(), workerGroupId);
return nodeId;
} catch (StarRocksException e) {
return null;
}
}

public List<Long> getAllComputeNodeIdsAssignToTablet(Long warehouseId, LakeTablet tablet) {
try {
long workerGroupId = selectWorkerGroupInternal(warehouseId).orElse(StarOSAgent.DEFAULT_WORKER_GROUP_ID);
ShardInfo shardInfo = GlobalStateMgr.getCurrentState().getStarOSAgent()
.getShardInfo(tablet.getShardId(), workerGroupId);

return GlobalStateMgr.getCurrentState().getStarOSAgent()
.getAllNodeIdsByShard(shardInfo, true);
} catch (StarClientException e) {
.getAllNodeIdsByShard(tablet.getShardId(), workerGroupId);
} catch (StarRocksException e) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void setUp() throws Exception {

new Expectations() {
{
starOSAgent.getPrimaryComputeNodeIdByShard(anyLong);
starOSAgent.getPrimaryComputeNodeIdByShard(anyLong, anyLong);
minTimes = 0;
result = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ public void testGetPrimaryComputeNodeIdByShard(@Mocked StarClient client) throws
workerToNode.put(1L, 2L);
Deencapsulation.setField(starosAgent, "workerToNode", workerToNode);

Assert.assertEquals(2, starosAgent.getPrimaryComputeNodeIdByShard(shardId));
Assert.assertEquals(2, starosAgent.getPrimaryComputeNodeIdByShard(shardId, StarOSAgent.DEFAULT_WORKER_GROUP_ID));
StarRocksException exception =
Assert.assertThrows(StarRocksException.class, () -> starosAgent.getPrimaryComputeNodeIdByShard(shardId));
Assert.assertThrows(StarRocksException.class, () -> starosAgent.getPrimaryComputeNodeIdByShard(shardId,
StarOSAgent.DEFAULT_WORKER_GROUP_ID));
Assert.assertEquals(InternalErrorCode.REPLICA_FEW_ERR, exception.getErrorCode());
}

Expand Down Expand Up @@ -269,6 +270,6 @@ public void allocatePartitionFilePathInfo() {
}

private Set<Long> getBackendIdsByShard(long shardId, long workerGroupId) throws StarRocksException {
return new HashSet<Long>(starosAgent.getAllNodeIdsByShard(shardId, workerGroupId, false));
return new HashSet<Long>(starosAgent.getAllNodeIdsByShard(shardId, workerGroupId));
}
}
23 changes: 9 additions & 14 deletions fe/fe-core/src/test/java/com/starrocks/lake/StarOSAgentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.staros.client.StarClient;
import com.staros.client.StarClientException;
import com.staros.proto.CreateShardGroupInfo;
Expand Down Expand Up @@ -56,10 +55,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class StarOSAgentTest {
private StarOSAgent starosAgent;
Expand Down Expand Up @@ -397,11 +394,11 @@ public void testGetBackendByShard() throws StarClientException, StarRocksExcepti
.setWorkerInfo(WorkerInfo.newBuilder().setWorkerId(1L).setWorkerState(WorkerState.ON).build())
.build();
ReplicaInfo replica2 = ReplicaInfo.newBuilder()
.setReplicaRole(ReplicaRole.SECONDARY)
.setReplicaRole(ReplicaRole.PRIMARY)
.setWorkerInfo(WorkerInfo.newBuilder().setWorkerId(2L).setWorkerState(WorkerState.ON).build())
.build();
ReplicaInfo replica3 = ReplicaInfo.newBuilder()
.setReplicaRole(ReplicaRole.SECONDARY)
.setReplicaRole(ReplicaRole.PRIMARY)
.setWorkerInfo(WorkerInfo.newBuilder().setWorkerId(3L).setWorkerState(WorkerState.OFF).build())
.build();
List<ReplicaInfo> replicas = Lists.newArrayList(replica1, replica2, replica3);
Expand Down Expand Up @@ -452,19 +449,21 @@ public String getIpPort() {

ExceptionChecker.expectThrowsWithMsg(StarRocksException.class,
"Failed to get primary backend. shard id: 10",
() -> starosAgent.getPrimaryComputeNodeIdByShard(10L));
() -> starosAgent.getPrimaryComputeNodeIdByShard(10L, StarOSAgent.DEFAULT_WORKER_GROUP_ID));

Assert.assertEquals(Sets.newHashSet(), getBackendIdsByShard(10L, 0));
Assert.assertEquals(Lists.newArrayList(),
starosAgent.getAllNodeIdsByShard(10L, StarOSAgent.DEFAULT_WORKER_GROUP_ID));

workerToNode.put(1L, 10001L);
workerToNode.put(2L, 10002L);
workerToNode.put(3L, 10003L);
Deencapsulation.setField(starosAgent, "workerToNode", workerToNode);

Deencapsulation.setField(starosAgent, "serviceId", "1");
Assert.assertEquals(10001L, starosAgent.getPrimaryComputeNodeIdByShard(10L));
Assert.assertEquals(Sets.newHashSet(10001L, 10002L, 10003L),
getBackendIdsByShard(10L, 0));
Assert.assertEquals(10001L, starosAgent.getPrimaryComputeNodeIdByShard(10L,
StarOSAgent.DEFAULT_WORKER_GROUP_ID));
Assert.assertEquals(Lists.newArrayList(10001L, 10002L, 10003L),
starosAgent.getAllNodeIdsByShard(10L, StarOSAgent.DEFAULT_WORKER_GROUP_ID));
}

@Test
Expand Down Expand Up @@ -769,10 +768,6 @@ public WorkerGroupDetailInfo updateWorkerGroup(String serviceId, long groupId,
() -> starosAgent.updateWorkerGroup(123, 1, "aaa"));
}

private Set<Long> getBackendIdsByShard(long shardId, long workerGroupId) throws StarRocksException {
return new HashSet<Long>(starosAgent.getAllNodeIdsByShard(shardId, workerGroupId, false));
}

@Test
public void testListShard() throws StarClientException, DdlException {
ShardInfo shardInfo = ShardInfo.newBuilder().setShardId(1000L).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,6 @@ public List<Long> createShards(int numShards, FilePathInfo pathInfo, FileCacheIn
public void deleteShardGroup(List<Long> groupIds) {
}

@Override
public long getPrimaryComputeNodeIdByShard(long shardId) throws StarRocksException {
return workers.isEmpty() ? -1 : workers.get((int) (shardId % workers.size())).backendId;
}

@Override
public long getPrimaryComputeNodeIdByShard(long shardId, long workerGroupId) throws StarRocksException {
return workers.isEmpty() ? -1 : workers.get((int) (shardId % workers.size())).backendId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public void testWarehouseNotExist() {
() -> mgr.getAllComputeNodeIds("a"));
ExceptionChecker.expectThrowsWithMsg(ErrorReportException.class, "Warehouse id: 1 not exist.",
() -> mgr.getAllComputeNodeIds(1L));
ExceptionChecker.expectThrowsWithMsg(ErrorReportException.class, "Warehouse name: a not exist.",
() -> mgr.getComputeNodeId("a", null));
ExceptionChecker.expectThrowsWithMsg(ErrorReportException.class, "Warehouse id: 1 not exist.",
() -> mgr.getComputeNodeId(1L, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ public void setAllComputeNodeIds(List<Long> computeNodeIds) {
warehouseIdToComputeNodeIds.put(DEFAULT_WAREHOUSE_ID, computeNodeIds);
}

@Override
public Long getComputeNodeId(String warehouseName, LakeTablet tablet) {
return computeNodeId;
}

public void setComputeNodeId(Long computeNodeId) {
this.computeNodeId = computeNodeId;
}
Expand Down

0 comments on commit 1697eff

Please # to comment.