diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index cc824608..f72a1b4a 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -292,12 +292,16 @@ func (c *graphdCluster) syncNebulaClusterStatus( return err } thriftPort := nc.GraphdComponent().GetPort(v1alpha1.GraphdPortNameThrift) - for i := range hostItems { - host := hostItems[i] + klog.Infof("Current graphd state: %v. Current number of replicas: %v", nc.Status.Graphd.Phase, pointer.Int32Deref(newReplicas, 0)) + for _, host := range hostItems { + klog.Infof("Currently looking at host: %v with status %v", strings.Split(host.HostAddr.Host, ".")[0], host.Status) if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { podName := strings.Split(host.HostAddr.Host, ".")[0] ordinal := getPodOrdinal(podName) if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Graphd.Replicas, 0) { + klog.Infof("graphd pod [%s/%s] has already been terminated by the sts. Skipping failover and/or removing from auto failover list", nc.Namespace, podName) + // delete is a no-op if FailureHosts or podName is nil + delete(nc.Status.Graphd.FailureHosts, podName) continue } if nc.Status.Graphd.FailureHosts == nil { diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index 0dbc1a5c..4a76a1f5 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -257,10 +257,16 @@ func (c *metadCluster) syncNebulaClusterStatus(nc *v1alpha1.NebulaCluster, oldWo return err } thriftPort := nc.MetadComponent().GetPort(v1alpha1.MetadPortNameThrift) - for i := range hostItems { - host := hostItems[i] + for _, host := range hostItems { if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { podName := strings.Split(host.HostAddr.Host, ".")[0] + ordinal := getPodOrdinal(podName) + if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Metad.Replicas, 0) { + klog.Infof("metad pod [%s/%s] has already been terminated by the sts. Skipping failover and/or removing from auto failover list", nc.Namespace, podName) + // delete is a no-op if FailureHosts or podName is nil + delete(nc.Status.Metad.FailureHosts, podName) + continue + } if nc.Status.Metad.FailureHosts == nil { nc.Status.Metad.FailureHosts = make(map[string]v1alpha1.FailureHost) } diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index 177d636e..d836f433 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -335,12 +335,14 @@ func (c *storagedCluster) syncNebulaClusterStatus( return err } thriftPort := nc.StoragedComponent().GetPort(v1alpha1.StoragedPortNameThrift) - for i := range hostItems { - host := hostItems[i] + for _, host := range hostItems { if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { podName := strings.Split(host.HostAddr.Host, ".")[0] ordinal := getPodOrdinal(podName) if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) { + klog.Infof("storaged pod [%s/%s] has already been terminated by the sts. Skipping failover and/or removing from auto failover list", nc.Namespace, podName) + // delete is a no-op if FailureHosts or podName is nil + delete(nc.Status.Storaged.FailureHosts, podName) continue } if nc.Status.Storaged.FailureHosts == nil { diff --git a/pkg/controller/component/storaged_updater.go b/pkg/controller/component/storaged_updater.go index 72e227bb..cc55e6ad 100644 --- a/pkg/controller/component/storaged_updater.go +++ b/pkg/controller/component/storaged_updater.go @@ -467,10 +467,20 @@ func (s *storagedUpdater) balanceLeader(mc nebula.MetaInterface, nc *v1alpha1.Ne if time.Now().Before(lastBalancedTime.Add(BalanceLeaderInterval * time.Second)) { return utilerrors.ReconcileErrorf("partition leader is balancing") } + + balanced, err := mc.IsLeaderBalanced(space.Name) + if err != nil { + return utilerrors.ReconcileErrorf("failed to check if the leader is balanced for space %s: %v", space.Name, err) + } + + if balanced { + nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) + continue + } + if err := mc.BalanceLeader(*space.Id.SpaceID); err != nil { return err } - nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) nc.Status.Storaged.LastBalancedTime = &metav1.Time{Time: time.Now()} return utilerrors.ReconcileErrorf("space %d need to be synced", *space.Id.SpaceID) } diff --git a/pkg/nebula/meta_client.go b/pkg/nebula/meta_client.go index e1bf9c36..d7bac272 100644 --- a/pkg/nebula/meta_client.go +++ b/pkg/nebula/meta_client.go @@ -53,6 +53,7 @@ type ( GetSpaceLeaderHosts(space []byte) ([]string, error) GetLeaderCount(leaderHost string) (int, error) BalanceStatus(jobID int32, spaceID nebula.GraphSpaceID) error + IsLeaderBalanced(spaceName []byte) (bool, error) BalanceLeader(spaceID nebula.GraphSpaceID) error BalanceData(spaceID nebula.GraphSpaceID) (int32, error) BalanceDataInZone(spaceID nebula.GraphSpaceID) (int32, error) @@ -356,6 +357,24 @@ func (m *metaClient) BalanceLeader(spaceID nebula.GraphSpaceID) error { return nil } +func (m *metaClient) IsLeaderBalanced(spaceName []byte) (bool, error) { + hosts, err := m.ListHosts(meta.ListHostType_ALLOC) + if err != nil { + return false, err + } + + for _, host := range hosts { + if host.Status != meta.HostStatus_ONLINE { + continue + } + if host.LeaderParts[(string)(spaceName)] == nil { + return false, nil + } + } + + return true, nil +} + func (m *metaClient) runAdminJob(req *meta.AdminJobReq) (int32, error) { resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) { resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))