From 905f629f4a350cefac45edfe13b5cdd507373662 Mon Sep 17 00:00:00 2001 From: Matt Brock Date: Wed, 25 Sep 2024 14:50:24 -0700 Subject: [PATCH] [v14] Displaying mode and controls to additional participants (#46450) (#46923) * Merge conflict resolution * Followup merge conflict issue --- integrations/operator/main.go | 1 - lib/client/api.go | 2 -- lib/client/kubesession.go | 2 -- lib/kube/proxy/sess.go | 16 +++++++++++----- lib/srv/sess.go | 34 ++++++++++++++++++++++++++-------- lib/web/apiserver_test.go | 6 +++--- 6 files changed, 40 insertions(+), 21 deletions(-) diff --git a/integrations/operator/main.go b/integrations/operator/main.go index 898fa7ec2409b..73a590a8539e4 100644 --- a/integrations/operator/main.go +++ b/integrations/operator/main.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" diff --git a/lib/client/api.go b/lib/client/api.go index d176920aaac76..25211825118db 100644 --- a/lib/client/api.go +++ b/lib/client/api.go @@ -1999,8 +1999,6 @@ func (tc *TeleportClient) Join(ctx context.Context, mode types.SessionParticipan } } - fmt.Printf("Joining session with participant mode: %v. \n\n", mode) - // running shell with a given session means "join" it: err = nc.RunInteractiveShell(ctx, mode, session, beforeStart) return trace.Wrap(err) diff --git a/lib/client/kubesession.go b/lib/client/kubesession.go index ea6217402a106..4d75b9cd50b6c 100644 --- a/lib/client/kubesession.go +++ b/lib/client/kubesession.go @@ -61,8 +61,6 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT TLSClientConfig: tlsConfig, } - fmt.Printf("Joining session with participant mode: %v. \n\n", mode) - ws, resp, err := dialer.DialContext(ctx, joinEndpoint, nil) if resp != nil && resp.Body != nil { defer resp.Body.Close() diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index 0a98df0d07cc7..a9ff022e50196 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -493,7 +493,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params s.io.OnReadError = s.disconnectPartyOnErr s.BroadcastMessage("Creating session with ID: %v...", id.String()) - s.BroadcastMessage(srv.SessionControlsInfoBroadcast) go func() { if _, open := <-s.io.TerminateNotifier(); open { @@ -969,7 +968,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { return trace.Wrap(err) } - // we only want to emit the session.join when someone tries to join a session via + // We only want to emit the session.join when someone tries to join a session via // tsh kube join and not when the original session owner terminal streams are // connected to the Kubernetes session. if emitJoinEvent { @@ -980,6 +979,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { if _, err := p.Client.stdoutStream().Write(recentWrites); err != nil { s.log.Warnf("Failed to write history to client: %v.", err) } + s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode) // increment the party track waitgroup. // It is decremented when session.leave() finishes its execution. @@ -1004,10 +1004,17 @@ func (s *session) join(p *party, emitJoinEvent bool) error { if p.Mode == types.SessionPeerMode { s.io.AddReader(stringID, p.Client.stdinStream()) } - s.io.AddWriter(stringID, p.Client.stdoutStream()) - s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode) + // Send the participant mode and controls to the additional participant + if p.Ctx.User.GetName() != s.ctx.User.GetName() { + err := srv.MsgParticipantCtrls(p.Client.stdoutStream(), p.Mode) + if err != nil { + s.log.Errorf("Could not send intro message to participant: %v", err) + } + } + + // Allow the moderator to force terminate the session if p.Mode == types.SessionModeratorMode { s.weakEventsWaiter.Add(1) go func() { @@ -1080,7 +1087,6 @@ func (s *session) join(p *party, emitJoinEvent bool) error { s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStateRunning) } } - return nil } diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 927848f97d4cf..94abc49b08c53 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -17,6 +17,7 @@ limitations under the License. package srv import ( + "bytes" "context" "encoding/json" "errors" @@ -65,10 +66,6 @@ const ( PresenceMaxDifference = time.Minute ) -// SessionControlsInfoBroadcast is sent in tandem with session creation -// to inform any joining users about the session controls. -const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)" - const ( // sessionRecordingWarningMessage is sent when the session recording is // going to be disabled. @@ -85,6 +82,21 @@ var serverSessions = prometheus.NewGauge( }, ) +func MsgParticipantCtrls(w io.Writer, m types.SessionParticipantMode) error { + var modeCtrl bytes.Buffer + modeCtrl.WriteString(fmt.Sprintf("\r\nTeleport > Joining session with participant mode: %s\r\n", string(m))) + modeCtrl.WriteString("Teleport > Controls\r\n") + modeCtrl.WriteString("Teleport > - CTRL-C: Leave the session\r\n") + if m == types.SessionModeratorMode { + modeCtrl.WriteString("Teleport > - t: Forcefully terminate the session\r\n") + } + _, err := w.Write(modeCtrl.Bytes()) + if err != nil { + return fmt.Errorf("could not write bytes: %w", err) + } + return nil +} + // SessionRegistry holds a map of all active sessions on a given // SSH server type SessionRegistry struct { @@ -1295,7 +1307,6 @@ func (s *session) startInteractive(ctx context.Context, scx *ServerContext, p *p s.io.AddReader("reader", inReader) s.io.AddWriter(sessionRecorderID, utils.WriteCloserWithContext(scx.srv.Context(), s.Recorder())) s.BroadcastMessage("Creating session with ID: %v", s.id) - s.BroadcastMessage(SessionControlsInfoBroadcast) if err := s.startTerminal(ctx, scx); err != nil { return trace.Wrap(err) @@ -1950,16 +1961,23 @@ func (s *session) addParty(p *party, mode types.SessionParticipantMode) error { s.participants[p.id] = p p.ctx.AddCloser(p) - // Write last chunk (so the newly joined parties won't stare at a blank - // screen). + // Write last chunk (so the newly joined parties won't stare at a blank screen). if _, err := p.Write(s.io.GetRecentHistory()); err != nil { return trace.Wrap(err) } + s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode) // Register this party as one of the session writers (output will go to it). s.io.AddWriter(string(p.id), p) - s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode) + // Send the participant mode and controls to the additional participant + if s.login != p.login { + err := MsgParticipantCtrls(p.ch, mode) + if err != nil { + s.log.Errorf("Could not send intro message to participant: %v", err) + } + } + s.log.Infof("New party %v joined the session with participant mode: %v.", p.String(), p.mode) if mode == types.SessionPeerMode { diff --git a/lib/web/apiserver_test.go b/lib/web/apiserver_test.go index bf8b1f6e3402c..2c77ee51a0e3a 100644 --- a/lib/web/apiserver_test.go +++ b/lib/web/apiserver_test.go @@ -7590,7 +7590,7 @@ func waitForOutput(r io.Reader, substr string) error { timeoutCh := time.After(10 * time.Second) var prev string - out := make([]byte, int64(len(substr)*2)) + out := make([]byte, int64(len(substr)*3)) for { select { case <-timeoutCh: @@ -9530,7 +9530,7 @@ func TestModeratedSession(t *testing.T) { peerStream := terminal.NewStream(ctx, terminal.StreamConfig{WS: peerWS}) - require.NoError(t, waitForOutput(peerStream, "Teleport > User foo joined the session with participant mode: peer.")) + require.NoError(t, waitForOutput(peerStream, "Teleport > Waiting for required participants...")) moderator := s.authPack(t, "bar", moderatorRole.GetName()) moderatorWS, _, err := s.makeTerminal(t, moderator, withSessionID(sess.ID), withParticipantMode(types.SessionModeratorMode)) @@ -9619,7 +9619,7 @@ func TestModeratedSessionWithMFA(t *testing.T) { peerStream := terminal.NewStream(ctx, terminal.StreamConfig{WS: peerWS}) - require.NoError(t, waitForOutput(peerStream, "Teleport > User foo joined the session with participant mode: peer.")) + require.NoError(t, waitForOutput(peerStream, "Teleport > Waiting for required participants...")) moderatorWS, _, err := s.makeTerminal(t, moderator, withSessionID(sess.ID), withParticipantMode(types.SessionModeratorMode)) require.NoError(t, err)