Skip to content

Commit

Permalink
Migrate kubernetes server heartbeats to inventory control stream
Browse files Browse the repository at this point in the history
Adds support for kubernetes server heartbeats to the auth server and
converts agents to use the new mechanism if they detect that auth
supports it. Additionally, clean up of kubernetes servers is now
performed automatically by auth in response to a GoodBye message.

Updates #40917
  • Loading branch information
rosstimothy committed Sep 1, 2024
1 parent 0034ef1 commit ac5edb4
Show file tree
Hide file tree
Showing 13 changed files with 1,501 additions and 970 deletions.
1,932 changes: 997 additions & 935 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2466,6 +2466,8 @@ message InventoryHeartbeat {
types.AppServerV3 AppServer = 2;
// DatabaseServer is a complete db server spec to be heartbeated.
types.DatabaseServerV3 DatabaseServer = 3;
// KubeServer is a complete kube server spec to be heartbeated.
types.KubernetesServerV3 KubernetesServer = 4;
}

// UpstreamInventoryGoodbye informs the upstream service that instance
Expand Down
12 changes: 7 additions & 5 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4590,11 +4590,13 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
Version: teleport.Version,
ServerID: a.ServerID,
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
NodeHeartbeats: true,
AppHeartbeats: true,
DatabaseHeartbeats: true,
AppCleanup: true,
DatabaseCleanup: true,
NodeHeartbeats: true,
AppHeartbeats: true,
DatabaseHeartbeats: true,
AppCleanup: true,
DatabaseCleanup: true,
KubernetesHeartbeats: true,
KubernetesCleanup: true,
},
}
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {
Expand Down
135 changes: 133 additions & 2 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ import (
// to the controller in order for it to be able to handle control streams.
type Auth interface {
UpsertNode(context.Context, types.Server) (*types.KeepAlive, error)

UpsertApplicationServer(context.Context, types.AppServer) (*types.KeepAlive, error)
DeleteApplicationServer(ctx context.Context, namespace, hostID, name string) error

UpsertDatabaseServer(context.Context, types.DatabaseServer) (*types.KeepAlive, error)
DeleteDatabaseServer(ctx context.Context, namespace, hostID, name string) error
KeepAliveServer(context.Context, types.KeepAlive) error

UpsertKubernetesServer(context.Context, types.KubeServer) (*types.KeepAlive, error)
DeleteKubernetesServer(ctx context.Context, hostID, name string) error

KeepAliveServer(context.Context, types.KeepAlive) error
UpsertInstance(ctx context.Context, instance types.Instance) error
}

Expand Down Expand Up @@ -82,6 +87,16 @@ const (
dbUpsertRetryOk testEvent = "db-upsert-retry-ok"
dbUpsertRetryErr testEvent = "db-upsert-retry-err"

kubeKeepAliveOk testEvent = "kube-keep-alive-ok"
kubeKeepAliveErr testEvent = "kube-keep-alive-err"
kubeKeepAliveDel testEvent = "kube-keep-alive-del"

kubeUpsertOk testEvent = "kube-upsert-ok"
kubeUpsertErr testEvent = "kube-upsert-err"

kubeUpsertRetryOk testEvent = "kube-upsert-retry-ok"
kubeUpsertRetryErr testEvent = "kube-upsert-retry-err"

instanceHeartbeatOk testEvent = "instance-heartbeat-ok"
instanceHeartbeatErr testEvent = "instance-heartbeat-err"

Expand Down Expand Up @@ -332,7 +347,11 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {

defer func() {
if handle.goodbye.GetDeleteResources() {
log.WithFields(log.Fields{"apps": len(handle.appServers), "dbs": len(handle.databaseServers)}).Debug("Cleaning up resources in response to instance termination")
log.WithFields(log.Fields{
"apps": len(handle.appServers),
"dbs": len(handle.databaseServers),
"kube": len(handle.kubernetesServers),
}).Debug("Cleaning up resources in response to instance termination")
for _, app := range handle.appServers {
if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err)
Expand All @@ -344,6 +363,12 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
log.Warnf("Failed to remove db server %q on termination: %v.", handle.Hello().ServerID, err)
}
}

for _, kube := range handle.kubernetesServers {
if err := c.auth.DeleteKubernetesServer(c.closeContext, kube.resource.GetHostID(), kube.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove kube server %q on termination: %v.", handle.Hello().ServerID, err)
}
}
}

c.instanceHBVariableDuration.Dec()
Expand All @@ -366,8 +391,13 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
c.onDisconnectFunc(constants.KeepAliveDatabase)
}

for range handle.kubernetesServers {
c.onDisconnectFunc(constants.KeepAliveKube)
}

clear(handle.appServers)
clear(handle.databaseServers)
clear(handle.kubernetesServers)
c.testEvent(handlerClose)
}()

Expand Down Expand Up @@ -547,6 +577,12 @@ func (c *Controller) handleHeartbeatMsg(handle *upstreamHandle, hb proto.Invento
}
}

if hb.KubernetesServer != nil {
if err := c.handleKubernetesServerHB(handle, hb.KubernetesServer); err != nil {
return trace.Wrap(err)
}
}

return nil
}

Expand Down Expand Up @@ -690,6 +726,55 @@ func (c *Controller) handleDatabaseServerHB(handle *upstreamHandle, databaseServ
}
return nil
}

func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetesServer *types.KubernetesServerV3) error {
// the auth layer verifies that a stream's hello message matches the identity and capabilities of the
// client cert. after that point it is our responsibility to ensure that heartbeated information is
// consistent with the identity and capabilities claimed in the initial hello.
if !handle.HasService(types.RoleKube) {
return trace.AccessDenied("control stream not configured to support kubernetes server heartbeats")
}
if kubernetesServer.GetHostID() != handle.Hello().ServerID {
return trace.AccessDenied("incorrect kubernetes server ID (expected %q, got %q)", handle.Hello().ServerID, kubernetesServer.GetHostID())
}

if handle.kubernetesServers == nil {
handle.kubernetesServers = make(map[dynamicResourceKey]*heartBeatInfo[*types.KubernetesServerV3])
}

kubeKey := dynamicResourceKey{hostID: kubernetesServer.GetHostID(), name: kubernetesServer.GetCluster().GetName()}

if _, ok := handle.kubernetesServers[kubeKey]; !ok {
c.onConnectFunc(constants.KeepAliveKube)
handle.kubernetesServers[kubeKey] = &heartBeatInfo[*types.KubernetesServerV3]{}
}

now := time.Now()

kubernetesServer.SetExpiry(now.Add(c.serverTTL).UTC())

lease, err := c.auth.UpsertKubernetesServer(c.closeContext, kubernetesServer)
if err == nil {
c.testEvent(kubeUpsertOk)
// store the new lease and reset retry state
srv := handle.kubernetesServers[kubeKey]
srv.lease = lease
srv.retryUpsert = false
srv.resource = kubernetesServer
} else {
c.testEvent(kubeUpsertErr)
log.Warnf("Failed to upsert kubernetes server %q on heartbeat: %v.", handle.Hello().ServerID, err)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
srv := handle.kubernetesServers[kubeKey]
srv.lease = nil
srv.retryUpsert = true
srv.resource = kubernetesServer
}
return nil
}

func (c *Controller) handleAgentMetadata(handle *upstreamHandle, m proto.UpstreamInventoryAgentMetadata) {
handle.SetAgentMetadata(m)

Expand Down Expand Up @@ -731,6 +816,10 @@ func (c *Controller) keepAliveServer(handle *upstreamHandle, now time.Time) erro
return trace.Wrap(err)
}

if err := c.keepAliveKubernetesServer(handle, now); err != nil {
return trace.Wrap(err)
}

return nil
}

Expand Down Expand Up @@ -826,6 +915,48 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti
return nil
}

func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time.Time) error {
for name, srv := range handle.kubernetesServers {
if srv.lease != nil {
lease := *srv.lease
lease.Expires = now.Add(c.serverTTL).UTC()
if err := c.auth.KeepAliveServer(c.closeContext, lease); err != nil {
c.testEvent(kubeKeepAliveErr)

srv.keepAliveErrs++
handle.kubernetesServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive kubernetes server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)

if shouldRemove {
c.testEvent(kubeKeepAliveDel)
delete(handle.kubernetesServers, name)
}
} else {
srv.keepAliveErrs = 0
c.testEvent(kubeKeepAliveOk)
}
} else if srv.retryUpsert {
srv.resource.SetExpiry(time.Now().Add(c.serverTTL).UTC())
lease, err := c.auth.UpsertKubernetesServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(kubeUpsertRetryErr)
log.Warnf("Failed to upsert kubernetes server %q on retry: %v.", handle.Hello().ServerID, err)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don'resource bother
// attempting a third time.
return trace.Errorf("failed to upsert kubernetes server on retry: %v", err)
}
c.testEvent(kubeUpsertRetryOk)

srv.lease = lease
srv.retryUpsert = false
}
}

return nil
}

func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) error {
if handle.sshServer == nil {
return nil
Expand Down
Loading

0 comments on commit ac5edb4

Please # to comment.