Skip to content

Commit

Permalink
[v14] Displaying mode and controls to additional participants (#46450) (
Browse files Browse the repository at this point in the history
#46923)

* Merge conflict resolution

* Followup merge conflict issue
  • Loading branch information
mvbrock authored Sep 25, 2024
1 parent ba26cc8 commit 905f629
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 21 deletions.
1 change: 0 additions & 1 deletion integrations/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down
2 changes: 0 additions & 2 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1999,8 +1999,6 @@ func (tc *TeleportClient) Join(ctx context.Context, mode types.SessionParticipan
}
}

fmt.Printf("Joining session with participant mode: %v. \n\n", mode)

// running shell with a given session means "join" it:
err = nc.RunInteractiveShell(ctx, mode, session, beforeStart)
return trace.Wrap(err)
Expand Down
2 changes: 0 additions & 2 deletions lib/client/kubesession.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT
TLSClientConfig: tlsConfig,
}

fmt.Printf("Joining session with participant mode: %v. \n\n", mode)

ws, resp, err := dialer.DialContext(ctx, joinEndpoint, nil)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
Expand Down
16 changes: 11 additions & 5 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
s.io.OnReadError = s.disconnectPartyOnErr

s.BroadcastMessage("Creating session with ID: %v...", id.String())
s.BroadcastMessage(srv.SessionControlsInfoBroadcast)

go func() {
if _, open := <-s.io.TerminateNotifier(); open {
Expand Down Expand Up @@ -969,7 +968,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
return trace.Wrap(err)
}

// we only want to emit the session.join when someone tries to join a session via
// We only want to emit the session.join when someone tries to join a session via
// tsh kube join and not when the original session owner terminal streams are
// connected to the Kubernetes session.
if emitJoinEvent {
Expand All @@ -980,6 +979,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
if _, err := p.Client.stdoutStream().Write(recentWrites); err != nil {
s.log.Warnf("Failed to write history to client: %v.", err)
}
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

// increment the party track waitgroup.
// It is decremented when session.leave() finishes its execution.
Expand All @@ -1004,10 +1004,17 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
if p.Mode == types.SessionPeerMode {
s.io.AddReader(stringID, p.Client.stdinStream())
}

s.io.AddWriter(stringID, p.Client.stdoutStream())
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

// Send the participant mode and controls to the additional participant
if p.Ctx.User.GetName() != s.ctx.User.GetName() {
err := srv.MsgParticipantCtrls(p.Client.stdoutStream(), p.Mode)
if err != nil {
s.log.Errorf("Could not send intro message to participant: %v", err)
}
}

// Allow the moderator to force terminate the session
if p.Mode == types.SessionModeratorMode {
s.weakEventsWaiter.Add(1)
go func() {
Expand Down Expand Up @@ -1080,7 +1087,6 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStateRunning)
}
}

return nil
}

Expand Down
34 changes: 26 additions & 8 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package srv

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -65,10 +66,6 @@ const (
PresenceMaxDifference = time.Minute
)

// SessionControlsInfoBroadcast is sent in tandem with session creation
// to inform any joining users about the session controls.
const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)"

const (
// sessionRecordingWarningMessage is sent when the session recording is
// going to be disabled.
Expand All @@ -85,6 +82,21 @@ var serverSessions = prometheus.NewGauge(
},
)

func MsgParticipantCtrls(w io.Writer, m types.SessionParticipantMode) error {
var modeCtrl bytes.Buffer
modeCtrl.WriteString(fmt.Sprintf("\r\nTeleport > Joining session with participant mode: %s\r\n", string(m)))
modeCtrl.WriteString("Teleport > Controls\r\n")
modeCtrl.WriteString("Teleport > - CTRL-C: Leave the session\r\n")
if m == types.SessionModeratorMode {
modeCtrl.WriteString("Teleport > - t: Forcefully terminate the session\r\n")
}
_, err := w.Write(modeCtrl.Bytes())
if err != nil {
return fmt.Errorf("could not write bytes: %w", err)
}
return nil
}

// SessionRegistry holds a map of all active sessions on a given
// SSH server
type SessionRegistry struct {
Expand Down Expand Up @@ -1295,7 +1307,6 @@ func (s *session) startInteractive(ctx context.Context, scx *ServerContext, p *p
s.io.AddReader("reader", inReader)
s.io.AddWriter(sessionRecorderID, utils.WriteCloserWithContext(scx.srv.Context(), s.Recorder()))
s.BroadcastMessage("Creating session with ID: %v", s.id)
s.BroadcastMessage(SessionControlsInfoBroadcast)

if err := s.startTerminal(ctx, scx); err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -1950,16 +1961,23 @@ func (s *session) addParty(p *party, mode types.SessionParticipantMode) error {
s.participants[p.id] = p
p.ctx.AddCloser(p)

// Write last chunk (so the newly joined parties won't stare at a blank
// screen).
// Write last chunk (so the newly joined parties won't stare at a blank screen).
if _, err := p.Write(s.io.GetRecentHistory()); err != nil {
return trace.Wrap(err)
}
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode)

// Register this party as one of the session writers (output will go to it).
s.io.AddWriter(string(p.id), p)

s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode)
// Send the participant mode and controls to the additional participant
if s.login != p.login {
err := MsgParticipantCtrls(p.ch, mode)
if err != nil {
s.log.Errorf("Could not send intro message to participant: %v", err)
}
}

s.log.Infof("New party %v joined the session with participant mode: %v.", p.String(), p.mode)

if mode == types.SessionPeerMode {
Expand Down
6 changes: 3 additions & 3 deletions lib/web/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7590,7 +7590,7 @@ func waitForOutput(r io.Reader, substr string) error {
timeoutCh := time.After(10 * time.Second)

var prev string
out := make([]byte, int64(len(substr)*2))
out := make([]byte, int64(len(substr)*3))
for {
select {
case <-timeoutCh:
Expand Down Expand Up @@ -9530,7 +9530,7 @@ func TestModeratedSession(t *testing.T) {

peerStream := terminal.NewStream(ctx, terminal.StreamConfig{WS: peerWS})

require.NoError(t, waitForOutput(peerStream, "Teleport > User foo joined the session with participant mode: peer."))
require.NoError(t, waitForOutput(peerStream, "Teleport > Waiting for required participants..."))

moderator := s.authPack(t, "bar", moderatorRole.GetName())
moderatorWS, _, err := s.makeTerminal(t, moderator, withSessionID(sess.ID), withParticipantMode(types.SessionModeratorMode))
Expand Down Expand Up @@ -9619,7 +9619,7 @@ func TestModeratedSessionWithMFA(t *testing.T) {

peerStream := terminal.NewStream(ctx, terminal.StreamConfig{WS: peerWS})

require.NoError(t, waitForOutput(peerStream, "Teleport > User foo joined the session with participant mode: peer."))
require.NoError(t, waitForOutput(peerStream, "Teleport > Waiting for required participants..."))

moderatorWS, _, err := s.makeTerminal(t, moderator, withSessionID(sess.ID), withParticipantMode(types.SessionModeratorMode))
require.NoError(t, err)
Expand Down

0 comments on commit 905f629

Please # to comment.