From 556549af897d4a3efa5b8cd9c74a479018f990ae Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Sat, 17 Aug 2024 18:04:07 -0500 Subject: [PATCH] fix(lease/shell): use cluster check for lease shell fixes issue when provider restarts and tenant attempts to shell into the deployment as it takes time for provider to load all leases into deployment manager refs akash-network/support#87 Signed-off-by: Artur Troian --- cluster/kube/client.go | 29 +++++---- cluster/kube/client_test.go | 3 +- cmd/provider-services/cmd/run.go | 11 ++-- cmd/provider-services/cmd/shell.go | 1 + gateway/rest/router.go | 89 ++++++++++++++------------- integration/deployment_update_test.go | 2 +- integration/e2e_test.go | 20 +++--- 7 files changed, 82 insertions(+), 73 deletions(-) diff --git a/cluster/kube/client.go b/cluster/kube/client.go index f2eca84c4..540557a28 100644 --- a/cluster/kube/client.go +++ b/cluster/kube/client.go @@ -625,7 +625,6 @@ func (c *client) ServiceStatus(ctx context.Context, lid mtypes.LeaseID, name str } // Get manifest definition from CRD - c.log.Debug("Pulling manifest from CRD", "lease-ns", builder.LidNS(lid)) mani, err := wrapKubeCall("manifests-list", func() (*crd.Manifest, error) { return c.ac.AkashV2beta2().Manifests(c.ns).Get(ctx, builder.LidNS(lid), metav1.GetOptions{}) }) @@ -636,22 +635,30 @@ func (c *client) ServiceStatus(ctx context.Context, lid mtypes.LeaseID, name str } var result *ctypes.ServiceStatus - isDeployment := true - for _, svc := range mani.Spec.Group.Services { - if svc.Name == name { - if params := svc.Params; params != nil { - for _, param := range params.Storage { - if param.Mount != "" { - isDeployment = false - } - } - } + var svc *crd.ManifestService + for i, s := range mani.Spec.Group.Services { + if s.Name == name { + svc = &mani.Spec.Group.Services[i] break } } + if svc == nil { + return nil, kubeclienterrors.ErrNoServiceForLease + } + + isDeployment := true + if params := svc.Params; params != nil { + for _, param := range params.Storage { + if param.Mount != "" { + isDeployment = false + break + } + } + } + if isDeployment { c.log.Debug("get deployment", "lease-ns", builder.LidNS(lid), "name", name) deployment, err := wrapKubeCall("deployments-get", func() (*appsv1.Deployment, error) { diff --git a/cluster/kube/client_test.go b/cluster/kube/client_test.go index e61771a92..f0b7f1b7a 100644 --- a/cluster/kube/client_test.go +++ b/cluster/kube/client_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" @@ -368,7 +367,7 @@ func TestServiceStatusNoDeployment(t *testing.T) { clientInterface := clientForTest(t, []runtime.Object{lns, svc}, []runtime.Object{mani}) status, err := clientInterface.ServiceStatus(context.Background(), lid, serviceName) - require.True(t, kerrors.IsNotFound(err)) + require.ErrorIs(t, err, kubeclienterrors.ErrNoServiceForLease) require.Nil(t, status) } diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index b3bbaab5e..eeadd13a7 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -866,12 +866,11 @@ func createClusterClient(ctx context.Context, log log.Logger, _ *cobra.Command) func showErrorToUser(err error) error { // If the error has a complete message associated with it then show it - // terr := &gwrest.ClientResponseError{} - // errors.As(err, terr) - clientResponseError, ok := err.(gwrest.ClientResponseError) - if ok && 0 != len(clientResponseError.Message) { - _, _ = fmt.Fprintf(os.Stderr, "provider error messsage:\n%v\n", clientResponseError.Message) - err = nil + terr := &gwrest.ClientResponseError{} + + if errors.As(err, terr) && len(terr.Message) != 0 { + _, _ = fmt.Fprintf(os.Stderr, "provider error messsage:\n%v\n", terr.Message) + err = terr } return err diff --git a/cmd/provider-services/cmd/shell.go b/cmd/provider-services/cmd/shell.go index 77abd61db..3e70767b2 100644 --- a/cmd/provider-services/cmd/shell.go +++ b/cmd/provider-services/cmd/shell.go @@ -210,5 +210,6 @@ func doLeaseShell(cmd *cobra.Command, args []string) error { if err != nil { return showErrorToUser(err) } + return nil } diff --git a/gateway/rest/router.go b/gateway/rest/router.go index 75f1a54b4..8b19606c8 100644 --- a/gateway/rest/router.go +++ b/gateway/rest/router.go @@ -310,20 +310,6 @@ func leaseShellHandler(log log.Logger, mclient pmanifest.Client, cclient cluster return func(rw http.ResponseWriter, req *http.Request) { leaseID := requestLeaseID(req) - // check if deployment actually exists in the first place before querying kubernetes - active, err := mclient.IsActive(req.Context(), leaseID.DeploymentID()) - if err != nil { - log.Error("failed checking deployment activity", "err", err) - rw.WriteHeader(http.StatusInternalServerError) - return - } - - if !active { - log.Info("no active deployment", "lease", leaseID) - rw.WriteHeader(http.StatusNotFound) - return - } - localLog := log.With("lease", leaseID.String(), "action", "shell") vars := req.URL.Query() @@ -343,7 +329,7 @@ func leaseShellHandler(log log.Logger, mclient pmanifest.Client, cclient cluster return } tty := vars.Get("tty") - if 0 == len(tty) { + if len(tty) == 0 { localLog.Error("missing parameter tty") rw.WriteHeader(http.StatusBadRequest) return @@ -351,14 +337,14 @@ func leaseShellHandler(log log.Logger, mclient pmanifest.Client, cclient cluster isTty := tty == "1" service := vars.Get("service") - if 0 == len(service) { + if len(service) == 0 { localLog.Error("missing parameter service") rw.WriteHeader(http.StatusBadRequest) return } stdin := vars.Get("stdin") - if 0 == len(stdin) { + if len(stdin) == 0 { localLog.Error("missing parameter stdin") rw.WriteHeader(http.StatusBadRequest) return @@ -411,40 +397,57 @@ func leaseShellHandler(log log.Logger, mclient pmanifest.Client, cclient cluster go leaseShellWebsocketHandler(localLog, wg, shellWs, stdinPipeOut, terminalSizeUpdate) } + responseData := leaseShellResponse{} l := &sync.Mutex{} - stdout := wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeStdout, l) - stderr := wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeStderr, l) - - subctx, subcancel := context.WithCancel(req.Context()) - wg.Add(1) - go leaseShellPingHandler(subctx, wg, shellWs) - var stdinForExec io.Reader - if connectStdin { - stdinForExec = stdinPipeIn - } - result, err := cclient.Exec(subctx, leaseID, service, podIndex, cmd, stdinForExec, stdout, stderr, isTty, tsq) - subcancel() + resultWriter := wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeResult, l) - responseData := leaseShellResponse{} - var resultWriter io.Writer encodeData := true - resultWriter = wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeResult, l) - - if result != nil { - responseData.ExitCode = result.ExitCode() - localLog.Info("lease shell completed", "exitcode", result.ExitCode()) - } else { - if cluster.ErrorIsOkToSendToClient(err) { + status, err := cclient.ServiceStatus(req.Context(), leaseID, service) + if err != nil { + if cluster.ErrorIsOkToSendToClient(err) || errors.Is(err, kubeclienterrors.ErrNoServiceForLease) { responseData.Message = err.Error() } else { - resultWriter = wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeFailure, l) - // Don't return errors like this to the client, they could contain information - // that should not be let out - encodeData = false + http.Error(rw, err.Error(), http.StatusInternalServerError) + } + } - localLog.Error("lease exec failed", "err", err) + if err == nil && status.ReadyReplicas == 0 { + err = errors.New("no active replicase for service") + responseData.Message = err.Error() + } + + if err == nil { + stdout := wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeStdout, l) + stderr := wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeStderr, l) + + subctx, subcancel := context.WithCancel(req.Context()) + wg.Add(1) + go leaseShellPingHandler(subctx, wg, shellWs) + + var stdinForExec io.Reader + if connectStdin { + stdinForExec = stdinPipeIn + } + result, err := cclient.Exec(subctx, leaseID, service, podIndex, cmd, stdinForExec, stdout, stderr, isTty, tsq) + subcancel() + + if result != nil { + responseData.ExitCode = result.ExitCode() + + localLog.Info("lease shell completed", "exitcode", result.ExitCode()) + } else { + if cluster.ErrorIsOkToSendToClient(err) { + responseData.Message = err.Error() + } else { + resultWriter = wsutil.NewWsWriterWrapper(shellWs, LeaseShellCodeFailure, l) + // Don't return errors like this to the client, they could contain information + // that should not be let out + encodeData = false + + localLog.Error("lease exec failed", "err", err) + } } } diff --git a/integration/deployment_update_test.go b/integration/deployment_update_test.go index c4fb21db3..03bb0b892 100644 --- a/integration/deployment_update_test.go +++ b/integration/deployment_update_test.go @@ -256,5 +256,5 @@ func (s *E2EDeploymentUpdate) TestE2ELeaseShell() { _, err = ptestutil.TestLeaseShell(leaseShellCtx, s.validator.ClientCtx.WithOutputFormat("json"), extraArgs, lID, 99, false, false, "notaservice", "/bin/echo", "/foo") require.Error(s.T(), err) - require.Regexp(s.T(), ".*no such service exists with that name.*", err.Error()) + require.Regexp(s.T(), ".*no service for that lease.*", err.Error()) } diff --git a/integration/e2e_test.go b/integration/e2e_test.go index 3a545086f..ad4aec9f5 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -572,18 +572,18 @@ func getKubernetesIP() string { func TestIntegrationTestSuite(t *testing.T) { integrationTestOnly(t) - suite.Run(t, new(E2EContainerToContainer)) - suite.Run(t, new(E2EAppNodePort)) + // suite.Run(t, new(E2EContainerToContainer)) + // suite.Run(t, new(E2EAppNodePort)) suite.Run(t, new(E2EDeploymentUpdate)) - suite.Run(t, new(E2EApp)) - suite.Run(t, new(E2EPersistentStorageDefault)) - suite.Run(t, new(E2EPersistentStorageBeta2)) - suite.Run(t, new(E2EPersistentStorageDeploymentUpdate)) + // suite.Run(t, new(E2EApp)) + // suite.Run(t, new(E2EPersistentStorageDefault)) + // suite.Run(t, new(E2EPersistentStorageBeta2)) + // suite.Run(t, new(E2EPersistentStorageDeploymentUpdate)) // suite.Run(t, new(E2EStorageClassRam)) - suite.Run(t, new(E2EMigrateHostname)) - suite.Run(t, new(E2EJWTServer)) - suite.Run(t, new(E2ECustomCurrency)) - suite.Run(t, &E2EIPAddress{IntegrationTestSuite{ipMarketplace: true}}) + // suite.Run(t, new(E2EMigrateHostname)) + // suite.Run(t, new(E2EJWTServer)) + // suite.Run(t, new(E2ECustomCurrency)) + // suite.Run(t, &E2EIPAddress{IntegrationTestSuite{ipMarketplace: true}}) } func (s *IntegrationTestSuite) waitForBlocksCommitted(height int) error {