Skip to content

Commit

Permalink
enhance: [2.5] Add management api to check querycoord balance status (#…
Browse files Browse the repository at this point in the history
…37784) (#39909)

issue: #37783
pr: #37784

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 authored Feb 19, 2025
1 parent 04175d8 commit 3c2d8c1
Show file tree
Hide file tree
Showing 15 changed files with 1,304 additions and 818 deletions.
11 changes: 11 additions & 0 deletions internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,17 @@ func (c *Client) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRe
})
}

func (c *Client) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest, opts ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.CheckBalanceStatusResponse, error) {
return client.CheckBalanceStatus(ctx, req)
})
}

func (c *Client) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/querycoord/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func Test_NewClient(t *testing.T) {

r39, err := client.CheckQueryNodeDistribution(ctx, nil)
retCheck(retNotNil, r39, err)

r40, err := client.CheckBalanceStatus(ctx, nil)
retCheck(retNotNil, r40, err)
}

client.(*Client).grpcClient = &mock.GRPCClientBase[querypb.QueryCoordClient]{
Expand Down
4 changes: 4 additions & 0 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ func (s *Server) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRe
return s.queryCoord.ResumeBalance(ctx, req)
}

func (s *Server) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) {
return s.queryCoord.CheckBalanceStatus(ctx, req)
}

func (s *Server) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest) (*commonpb.Status, error) {
return s.queryCoord.SuspendNode(ctx, req)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/distributed/querycoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ func Test_NewServer(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})

t.Run("CheckBalanceStatus", func(t *testing.T) {
req := &querypb.CheckBalanceStatusRequest{}
mqc.EXPECT().CheckBalanceStatus(mock.Anything, req).Return(&querypb.CheckBalanceStatusResponse{Status: merr.Success()}, nil)
resp, err := server.CheckBalanceStatus(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("SuspendNode", func(t *testing.T) {
req := &querypb.SuspendNodeRequest{}
mqc.EXPECT().SuspendNode(mock.Anything, req).Return(merr.Success(), nil)
Expand Down
1 change: 1 addition & 0 deletions internal/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (

RouteSuspendQueryCoordBalance = "/management/querycoord/balance/suspend"
RouteResumeQueryCoordBalance = "/management/querycoord/balance/resume"
RouteQueryCoordBalanceStatus = "/management/querycoord/balance/status"
RouteTransferSegment = "/management/querycoord/transfer/segment"
RouteTransferChannel = "/management/querycoord/transfer/channel"

Expand Down
59 changes: 59 additions & 0 deletions internal/mocks/mock_querycoord.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions internal/mocks/mock_querycoord_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions internal/proxy/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func RegisterMgrRoute(proxy *Proxy) {
Path: management.RouteCheckQueryNodeDistribution,
HandlerFunc: proxy.CheckQueryNodeDistribution,
})
management.Register(&management.Handler{
Path: management.RouteQueryCoordBalanceStatus,
HandlerFunc: proxy.CheckQueryCoordBalanceStatus,
})
})
}

Expand Down Expand Up @@ -249,6 +253,29 @@ func (node *Proxy) ResumeQueryCoordBalance(w http.ResponseWriter, req *http.Requ
w.Write([]byte(`{"msg": "OK"}`))
}

func (node *Proxy) CheckQueryCoordBalanceStatus(w http.ResponseWriter, req *http.Request) {
resp, err := node.queryCoord.CheckBalanceStatus(req.Context(), &querypb.CheckBalanceStatusRequest{
Base: commonpbutil.NewMsgBase(),
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to check balance status, %s"}`, err.Error())))
return
}

if !merr.Ok(resp.GetStatus()) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to check balance status, %s"}`, resp.GetStatus().GetReason())))
return
}
w.WriteHeader(http.StatusOK)
balanceStatus := "suspended"
if resp.IsActive {
balanceStatus = "active"
}
w.Write([]byte(fmt.Sprintf(`{"msg": "OK", "status": "%v"}`, balanceStatus)))
}

func (node *Proxy) SuspendQueryNode(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
Expand Down
62 changes: 62 additions & 0 deletions internal/proxy/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,68 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() {
})
}

func (s *ProxyManagementSuite) TestCheckBalanceStatus() {
s.Run("normal", func() {
s.SetupTest()
defer s.TearDownTest()

s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(&querypb.CheckBalanceStatusResponse{
Status: merr.Success(),
IsActive: true,
}, nil).Times(1)

req, err := http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
s.Require().NoError(err)

recorder := httptest.NewRecorder()
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
s.Equal(http.StatusOK, recorder.Code)
s.Equal(`{"msg": "OK", "status": "active"}`, recorder.Body.String())

s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(&querypb.CheckBalanceStatusResponse{
Status: merr.Success(),
IsActive: false,
}, nil).Times(1)

req, err = http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
s.Require().NoError(err)
recorder = httptest.NewRecorder()
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
s.Equal(http.StatusOK, recorder.Code)
s.Equal(`{"msg": "OK", "status": "suspended"}`, recorder.Body.String())
})

s.Run("return_error", func() {
s.SetupTest()
defer s.TearDownTest()

s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))

req, err := http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
s.Require().NoError(err)

recorder := httptest.NewRecorder()
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
s.Equal(http.StatusInternalServerError, recorder.Code)
})

s.Run("return_failure", func() {
s.SetupTest()
defer s.TearDownTest()

req, err := http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
s.Require().NoError(err)

s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(&querypb.CheckBalanceStatusResponse{
Status: merr.Status(merr.ErrServiceNotReady),
}, nil)

recorder := httptest.NewRecorder()
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
s.Equal(http.StatusInternalServerError, recorder.Code)
})
}

func (s *ProxyManagementSuite) TestSuspendQueryNode() {
s.Run("normal", func() {
s.SetupTest()
Expand Down
8 changes: 8 additions & 0 deletions internal/querycoordv2/ops_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,18 @@ func (suite *OpsServiceSuite) TestSuspendAndResumeBalance() {
suite.True(merr.Ok(resp))
suite.False(suite.checkerController.IsActive(utils.BalanceChecker))

resp1, err := suite.server.CheckBalanceStatus(ctx, &querypb.CheckBalanceStatusRequest{})
suite.NoError(err)
suite.Equal(false, resp1.GetIsActive())

resp, err = suite.server.ResumeBalance(ctx, &querypb.ResumeBalanceRequest{})
suite.NoError(err)
suite.True(merr.Ok(resp))
suite.True(suite.checkerController.IsActive(utils.BalanceChecker))

resp2, err := suite.server.CheckBalanceStatus(ctx, &querypb.CheckBalanceStatusRequest{})
suite.NoError(err)
suite.Equal(true, resp2.GetIsActive())
}

func (suite *OpsServiceSuite) TestSuspendAndResumeNode() {
Expand Down
27 changes: 27 additions & 0 deletions internal/querycoordv2/ops_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,33 @@ func (s *Server) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRe
return merr.Success(), nil
}

// CheckBalanceStatus checks whether balance is active or suspended
func (s *Server) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) {
log := log.Ctx(ctx)
log.Info("CheckBalanceStatus request received")

errMsg := "failed to check balance status"
if err := merr.CheckHealthy(s.State()); err != nil {
log.Warn(errMsg, zap.Error(err))
return &querypb.CheckBalanceStatusResponse{
Status: merr.Status(err),
}, nil
}

isActive, err := s.checkerController.IsActive(utils.BalanceChecker)
if err != nil {
log.Warn(errMsg, zap.Error(err))
return &querypb.CheckBalanceStatusResponse{
Status: merr.Status(err),
}, nil
}

return &querypb.CheckBalanceStatusResponse{
Status: merr.Success(),
IsActive: isActive,
}, nil
}

// suspend node from resource operation, for given node, suspend load_segment/sub_channel operations
func (s *Server) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx)
Expand Down
4 changes: 4 additions & 0 deletions internal/util/mock/grpc_querycoord_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (m *GrpcQueryCoordClient) ResumeBalance(ctx context.Context, req *querypb.R
return &commonpb.Status{}, m.Err
}

func (m *GrpcQueryCoordClient) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest, opts ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error) {
return &querypb.CheckBalanceStatusResponse{}, m.Err
}

func (m *GrpcQueryCoordClient) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
Expand Down
Loading

0 comments on commit 3c2d8c1

Please # to comment.