diff --git a/client/src/components/WSTerminal.tsx b/client/src/components/WSTerminal.tsx index 2aad637..01dd714 100644 --- a/client/src/components/WSTerminal.tsx +++ b/client/src/components/WSTerminal.tsx @@ -45,8 +45,8 @@ class WSTerminal extends React.Component { this.rescale(); }) - this.props.msgManager.pub("request", constants.MSG_TREQUEST_WINSIZE ); - this.props.msgManager.pub("request", constants.MSG_TREQUEST_CACHE_MESSAGE ); + this.props.msgManager.pub("request", constants.MSG_TREQUEST_WINSIZE); + this.props.msgManager.pub("request", constants.MSG_TREQUEST_CACHE_CONTENT); window.addEventListener("resize", () => this.rescale()); this.rescale(); diff --git a/client/src/lib/constants.ts b/client/src/lib/constants.ts index 462abb9..927eea2 100644 --- a/client/src/lib/constants.ts +++ b/client/src/lib/constants.ts @@ -6,6 +6,6 @@ export const MSG_TREQUEST_WINSIZE = "RequestWinsize"; export const MSG_TCHAT = "Chat"; export const MSG_TREQUEST_CHAT = "RequestChat"; export const COLOR_LIST = ["#1f9c19", "#bd4719", "#d6c613", "#13cfc5", "#144eb3", "#6e10cc", "#db12d5"] -export const MSG_TREQUEST_CACHE_MESSAGE = "RequestCacheMessage"; +export const MSG_TREQUEST_CACHE_CONTENT = "RequestCacheContent"; export const MSG_TREQUEST_ROOM_INFO = "RequestRoomInfo"; export const MSG_TREQUEST_CACHE_CHAT = "RequestCacheChat"; diff --git a/tstream/cmd/server.go b/tstream/cmd/server.go index 356c1a2..d1848f0 100644 --- a/tstream/cmd/server.go +++ b/tstream/cmd/server.go @@ -8,11 +8,12 @@ package main import ( "flag" "fmt" + "log" + "os" + "github.com/qnkhuat/tstream/internal/cfg" "github.com/qnkhuat/tstream/internal/logging" "github.com/qnkhuat/tstream/pkg/server" - "log" - "os" ) func main() { diff --git a/tstream/cmd/tstream.go b/tstream/cmd/tstream.go index ab1b136..d46c628 100644 --- a/tstream/cmd/tstream.go +++ b/tstream/cmd/tstream.go @@ -20,22 +20,18 @@ import ( "bufio" "flag" "fmt" - "github.com/manifoldco/promptui" - "github.com/qnkhuat/tstream/internal/cfg" - "github.com/qnkhuat/tstream/internal/logging" - "github.com/qnkhuat/tstream/pkg/streamer" "log" "os" "os/user" "regexp" + + "github.com/manifoldco/promptui" + "github.com/qnkhuat/tstream/internal/cfg" + "github.com/qnkhuat/tstream/internal/logging" + "github.com/qnkhuat/tstream/pkg/streamer" ) func main() { - // Check if current process is under a tstream session - if len(os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID)) > 0 { - fmt.Printf("This terminal is currently running under session: %s\nType 'exit' to stop the current session!\n", os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID)) - os.Exit(1) - } logging.Config("/tmp/tstream.log", "STREAMER: ") flag.Usage = func() { @@ -55,6 +51,7 @@ func main() { os.Exit(0) return } + validateUsername := func(input string) error { var validUsername = regexp.MustCompile(`^[a-z][a-z0-9]*[._-]?[a-z0-9]+$`) if validUsername.MatchString(input) && len(input) > 3 && len(input) < 20 { @@ -78,11 +75,11 @@ func main() { } var username string - cfg, err := streamer.ReadCfg(streamer.CONFIG_PATH) + config, err := streamer.ReadCfg(streamer.CONFIG_PATH) if err != nil { username = u.Username } else { - username = cfg.Username + username = config.Username } promptUsername := promptui.Prompt{ @@ -98,6 +95,12 @@ func main() { if !*chat { // Start Streaming session + + // Check if current process is under a tstream session + if len(os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID)) > 0 { + fmt.Printf("This terminal is currently running under session: %s\nType 'exit' to stop the current session!\n", os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID)) + os.Exit(1) + } username, err = promptUsername.Run() if err != nil { os.Exit(1) @@ -125,7 +128,7 @@ func main() { os.Exit(1) } // Update config before start - cfg.Username = username + config.Username = username streamer.UpdateCfg(streamer.CONFIG_PATH, "Username", username) err = s.Start() @@ -134,13 +137,13 @@ func main() { } } else { var username = "" // also is sessionID - cfg, err := streamer.ReadCfg(streamer.CONFIG_PATH) + config, err := streamer.ReadCfg(streamer.CONFIG_PATH) if err != nil { fmt.Printf("No stream session detected\n") os.Exit(1) } else { - username = cfg.Username + username = config.Username } if username == "" { diff --git a/tstream/pkg/message/message.go b/tstream/pkg/message/message.go index 4edffaf..114e551 100644 --- a/tstream/pkg/message/message.go +++ b/tstream/pkg/message/message.go @@ -7,6 +7,7 @@ package message import ( "encoding/json" + "fmt" "time" ) @@ -16,11 +17,12 @@ import ( type MType string const ( - TWrite MType = "Write" - TChat MType = "Chat" - TClose MType = "Close" - TError MType = "Error" - TRoomInfo MType = "RoomInfo" + TWrite MType = "Write" + TChat MType = "Chat" + TClose MType = "Close" + TError MType = "Error" + TRoomInfo MType = "RoomInfo" + TClientInfo MType = "ClientInfo" // When streamer resize their termianl TWinsize MType = "Winsize" @@ -30,14 +32,20 @@ const ( TRequestRoomInfo MType = "RequestRoomInfo" - // When user first connect to server - TStreamerConnect MType = "StreamerConnect" - // when user first join the room, he can request for cached message to avoid idle screen - TRequestCacheMessage = "RequestCacheMessage" + TRequestCacheContent MType = "RequestCacheContent" + + // when user first join the room, he can request for cached chat to avoid idle chat screen + TRequestCacheChat MType = "RequestCacheChat" + + // Server can request client info to assign roles and verrification + TRequestClientInfo MType = "RequestClientInfo" - // when user first join the room, he can request for cache chat to avoid idle chat screen - TRequestCacheChat = "RequestCacheChat" + // Server will when this message if streamer is verified. Then streamer can proceed to start stream + TStreamerAuthorized MType = "StreamerAuthorized" + + // If websocket connection is illegal. server send this message to streamer then close connection + TStreamerUnauthorized MType = "StreamerUnauthorized" ) type Wrapper struct { @@ -84,6 +92,21 @@ type RoomInfo struct { Status RoomStatus } +// *** Client *** +type CRole string + +const ( + RStreamerChat CRole = "StreamerChat" // Chat for streamer + RStreamer CRole = "Streamer" // Send content to server + RViewer CRole = "Viewer" // View content + chat +) + +type ClientInfo struct { + Name string + Role CRole + Secret string +} + // *** Helper functions *** func Unwrap(buff []byte) (Wrapper, error) { @@ -92,6 +115,31 @@ func Unwrap(buff []byte) (Wrapper, error) { return obj, err } +// Unwrap the wrapper data as well +func Unwrap2(buff []byte) (MType, interface{}, error) { + msg := Wrapper{} + err := json.Unmarshal(buff, &msg) + if err != nil { + return msg.Type, nil, err + } + + var msgObj interface{} + switch msg.Type { + case TChat: + msgObj = Chat{} + err = json.Unmarshal(msg.Data, msgObj) + + case TRequestClientInfo: + msgObj = ClientInfo{} + err = json.Unmarshal(msg.Data, msgObj) + + default: + err = fmt.Errorf("Not implemented") + } + + return msg.Type, msgObj, err +} + func Wrap(msgType MType, msgObject interface{}) (Wrapper, error) { data, err := json.Marshal(msgObject) diff --git a/tstream/pkg/viewer/viewer.go b/tstream/pkg/room/client.go similarity index 63% rename from tstream/pkg/viewer/viewer.go rename to tstream/pkg/room/client.go index 5846e89..a460068 100644 --- a/tstream/pkg/viewer/viewer.go +++ b/tstream/pkg/room/client.go @@ -1,16 +1,19 @@ -package viewer +/* +Generic struct for a websocket connection +Currently used for Viewer and Chat +*/ +package room import ( "github.com/gorilla/websocket" + "github.com/qnkhuat/tstream/pkg/message" "log" "time" ) -var emptyByteArray []byte - -type Viewer struct { +type Client struct { conn *websocket.Conn - id string + role message.CRole // data go in Out channel will be send to user via websocket Out chan []byte @@ -21,30 +24,35 @@ type Viewer struct { alive bool } -func New(id string, conn *websocket.Conn) *Viewer { +func NewClient(role message.CRole, conn *websocket.Conn) *Client { out := make(chan []byte, 256) // buffer 256 send requests in := make(chan []byte, 256) // buffer 256 send requests - return &Viewer{ + return &Client{ conn: conn, - id: id, Out: out, In: in, + role: role, alive: true, } } -func (v *Viewer) Alive() bool { +func (v *Client) Role() message.CRole { + return v.role +} + +func (v *Client) Alive() bool { return v.alive } -func (v *Viewer) Start() { +func (v *Client) Start() { + // Receive message coroutine go func() { for { msg, ok := <-v.Out if ok { err := v.conn.WriteMessage(websocket.TextMessage, msg) if err != nil { - log.Printf("Failed to boardcast to %s. Closing connection", v.id) + log.Printf("Failed to boardcast to. Closing connection") v.Close() } } else { @@ -53,6 +61,7 @@ func (v *Viewer) Start() { } }() + // Send message coroutine for { _, msg, err := v.conn.ReadMessage() if err == nil { @@ -65,7 +74,7 @@ func (v *Viewer) Start() { } } -func (v *Viewer) Close() { +func (v *Client) Close() { v.conn.WriteControl(websocket.CloseMessage, emptyByteArray, time.Time{}) v.alive = false v.conn.Close() diff --git a/tstream/pkg/room/room.go b/tstream/pkg/room/room.go index 5548738..e4f0777 100644 --- a/tstream/pkg/room/room.go +++ b/tstream/pkg/room/room.go @@ -14,7 +14,6 @@ import ( "github.com/qnkhuat/tstream/internal/cfg" "github.com/qnkhuat/tstream/pkg/message" - "github.com/qnkhuat/tstream/pkg/viewer" ) var emptyByteArray []byte @@ -22,9 +21,9 @@ var emptyByteArray []byte type Room struct { lock sync.Mutex streamer *websocket.Conn - viewers map[string]*viewer.Viewer + chats map[string]*Client // Chat only connection + viewers map[string]*Client accViewers uint64 // accumulated viewers - chats map[string]*viewer.Viewer name string // also is streamerID id uint64 // Id in DB title string @@ -38,13 +37,15 @@ type Room struct { } func New(name, title, secret string) *Room { - viewers := make(map[string]*viewer.Viewer) + viewers := make(map[string]*Client) + chats := make(map[string]*Client) var buffer [][]byte var cacheChat [][]byte return &Room{ name: name, accViewers: 0, viewers: viewers, + chats: chats, lastActiveTime: time.Now(), startedTime: time.Now(), msgBuffer: buffer, @@ -63,7 +64,7 @@ func (r *Room) StartedTime() time.Time { return r.startedTime } -func (r *Room) Viewers() map[string]*viewer.Viewer { +func (r *Room) Viewers() map[string]*Client { return r.viewers } @@ -100,11 +101,11 @@ func (r *Room) Streamer() *websocket.Conn { } func (r *Room) AddStreamer(conn *websocket.Conn) error { - // TODO: hanlde case when streamer already existed if r.streamer != nil { r.streamer.Close() - //return fmt.Errorf("Streamer existed") } + // Verify streamer secret + log.Printf("New streamer") r.streamer = conn r.status = message.RStreaming @@ -151,7 +152,7 @@ func (r *Room) AddViewer(ID string, conn *websocket.Conn) error { return fmt.Errorf("Viewer %s existed", conn) } - v := viewer.New(ID, conn) + v := NewClient(message.RStreamer, conn) r.viewers[ID] = v go v.Start() @@ -200,20 +201,11 @@ func (r *Room) Start() { log.Printf("Unable to decode message: %s", err) continue } + if wrapperMsg.Type == message.TWinsize || wrapperMsg.Type == message.TWrite { r.lastActiveTime = time.Now() r.addMsgBuffer(msg) r.Broadcast(msg, []string{}) - } else if wrapperMsg.Type == message.TStreamerConnect { - msgObject := &message.StreamerConnect{} - err := json.Unmarshal(wrapperMsg.Data, msgObject) - if err != nil { - log.Printf("Failed to decode message: %s", err) - } else { - r.SetTitle(msgObject.Title) - } - log.Printf("Set title: %s", msgObject.Title) - } else { log.Printf("Unknown message type: %s", wrapperMsg.Type) } @@ -234,13 +226,13 @@ func (r *Room) addCachedChat(chat []byte) { r.cacheChat = append(r.cacheChat, chat) } -func (r *Room) ReadAndHandleViewerMessage(ID string) { - viewer, ok := r.viewers[ID] +func (r *Room) ReadAndHandleClientMessage(ID string) { + client, ok := r.viewers[ID] if !ok { return } for { - msg, _ := <-viewer.In + msg, _ := <-client.In msgObj, err := message.Unwrap(msg) if err != nil { @@ -254,12 +246,12 @@ func (r *Room) ReadAndHandleViewerMessage(ID string) { Cols: r.lastWinsize.Cols, }) payload, _ := json.Marshal(msg) - viewer.Out <- payload + client.Out <- payload - } else if msgObj.Type == message.TRequestCacheMessage { + } else if msgObj.Type == message.TRequestCacheContent { // Send msg buffer so viewers doesn't face a idle screen when first started for _, msg := range r.msgBuffer { - viewer.Out <- msg + client.Out <- msg } } else if msgObj.Type == message.TRequestRoomInfo { @@ -268,7 +260,7 @@ func (r *Room) ReadAndHandleViewerMessage(ID string) { if err == nil { payload, _ := json.Marshal(msg) - viewer.Out <- payload + client.Out <- payload } else { log.Printf("Error wrapping room info message: %s", err) } @@ -283,7 +275,7 @@ func (r *Room) ReadAndHandleViewerMessage(ID string) { if err == nil { payload, _ := json.Marshal(msg) if msgObj.Type == message.TRequestCacheChat { - viewer.Out <- payload + client.Out <- payload } else { r.Broadcast(payload, []string{ID}) } @@ -305,7 +297,6 @@ func (r *Room) Broadcast(msg []uint8, IDExclude []string) { } } - count := 0 for id, viewer := range r.viewers { // TODO: make this for loop run in parallel var isExcluded bool = false @@ -319,7 +310,6 @@ func (r *Room) Broadcast(msg []uint8, IDExclude []string) { } if viewer.Alive() { - count += 1 viewer.Out <- msg } else { log.Printf("Failed to boardcast to %s. Closing connection", id) diff --git a/tstream/pkg/server/handlers.go b/tstream/pkg/server/handlers.go index 224d510..9dd07ca 100644 --- a/tstream/pkg/server/handlers.go +++ b/tstream/pkg/server/handlers.go @@ -3,24 +3,20 @@ package server import ( "encoding/json" "fmt" + "log" + "net/http" + "strconv" + "strings" + "time" + "github.com/google/uuid" "github.com/gorilla/mux" "github.com/gorilla/schema" "github.com/gorilla/websocket" "github.com/qnkhuat/tstream/internal/cfg" "github.com/qnkhuat/tstream/pkg/message" - "log" - "net/http" - "strconv" - "strings" - "time" ) -func handleHealth(w http.ResponseWriter, r *http.Request) { - log.Printf("health check") - fmt.Fprintf(w, "I'm fine: %s\n", time.Now().String()) -} - // upgrade an http request to websocket var httpUpgrader = websocket.Upgrader{ ReadBufferSize: cfg.SERVER_READ_BUFFER_SIZE, @@ -30,8 +26,13 @@ var httpUpgrader = websocket.Upgrader{ }, } -var emptyByteArray []byte var decoder = schema.NewDecoder() +var emptyByteArray []byte + +const ( + // Time to wait before force close on connection. + CLOSE_GRACE_PERIOD = 2 * time.Second +) // Queries: // - status - string : Status of Room to query. Leave blank to get any @@ -43,6 +44,11 @@ type ListRoomQuery struct { Skip int `schema:"skip"` } +func handleHealth(w http.ResponseWriter, r *http.Request) { + log.Printf("health check") + fmt.Fprintf(w, "I'm fine: %s\n", time.Now().String()) +} + func (s *Server) handleListRooms(w http.ResponseWriter, r *http.Request) { var q ListRoomQuery err := decoder.Decode(&q, r.URL.Query()) @@ -83,7 +89,6 @@ func (s *Server) handleAddRoom(w http.ResponseWriter, r *http.Request) { err := decoder.Decode(&q, r.URL.Query()) if err != nil { log.Printf("Failed to decode queries:%s", err) - http.Error(w, err.Error(), 400) return } @@ -108,7 +113,7 @@ func (s *Server) handleAddRoom(w http.ResponseWriter, r *http.Request) { if _, ok := s.rooms[q.StreamerID]; !ok { s.NewRoom(q.StreamerID, q.Title, b.Secret) log.Printf("Added a room %s, %s", q.StreamerID, q.Title) - w.WriteHeader(200) + w.WriteHeader(http.StatusOK) return } else { if s.rooms[q.StreamerID].Secret() != b.Secret { @@ -143,34 +148,14 @@ func (s *Server) handleWSViewer(w http.ResponseWriter, r *http.Request) { room.AddViewer(viewerID, conn) // Handle incoming request from user - room.ReadAndHandleViewerMessage(viewerID) // Blocking call -} - -// Websocket connection for streamer to chat -func (s *Server) s.handleWSSChat(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - roomName := vars["roomName"] - log.Printf("Client %s entered room: %s", r.RemoteAddr, roomName) - room, ok := s.rooms[roomName] - if !ok { - fmt.Fprintf(w, "Room not existed") - log.Printf("Room :%s not existed", roomName) - return - } - conn, err := httpUpgrader.Upgrade(w, r, nil) - if err != nil { - log.Printf("Failed to upgrade to websocket: %s", err) - } - - viewerID := uuid.New().String() - room.AddViewer(viewerID, conn) - - // Handle incoming request from user - room.ReadAndHandleViewerMessage(viewerID) // Blocking call + room.ReadAndHandleClientMessage(viewerID) // Blocking call } // Websocket connection from streamer -// TODO: Add key checking to make sure only streamer can stream via this endpoint +// When connected server will wait for a clientinfo message from streamer +// Server then verify this client info's secret. +// If it matches => send back a message type authorized else send back unauthorized +// This has to be happen in the exact order func (s *Server) handleWSStreamer(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) roomName := vars["roomName"] @@ -185,12 +170,53 @@ func (s *Server) handleWSStreamer(w http.ResponseWriter, r *http.Request) { log.Printf("Failed to upgrade to websocket: %s", err) } defer conn.Close() - err = s.rooms[roomName].AddStreamer(conn) + + // Wait for client response + _, msg, err := conn.ReadMessage() + msgObj, err := message.Unwrap(msg) + if err != nil { - log.Printf("Failed to add streamer: %s", err) + log.Printf("Failed to decode info message") + conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second)) + time.Sleep(CLOSE_GRACE_PERIOD * time.Second) + return + } + + if msgObj.Type != message.TClientInfo { + log.Printf("Required client info message") + conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second)) + time.Sleep(CLOSE_GRACE_PERIOD * time.Second) + return } - s.rooms[roomName].Start() // Blocking call + clientInfo := &message.ClientInfo{} + err = json.Unmarshal(msgObj.Data, clientInfo) + if err != nil { + log.Printf("Failed to decode message") + conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second)) + time.Sleep(CLOSE_GRACE_PERIOD * time.Second) + return + } + + if clientInfo.Role == message.RStreamer && clientInfo.Secret != s.rooms[roomName].Secret() { + log.Printf("Unauthorized streamer connection") + sucessMsg, _ := message.Wrap(message.TStreamerUnauthorized, emptyByteArray) + payload, _ := json.Marshal(sucessMsg) + conn.WriteMessage(websocket.TextMessage, payload) + time.Sleep(CLOSE_GRACE_PERIOD * time.Second) + return + } else { + log.Printf("Authorized connection: Secret: %s", clientInfo.Secret) + sucessMsg, _ := message.Wrap(message.TStreamerAuthorized, emptyByteArray) + payload, _ := json.Marshal(sucessMsg) + conn.WriteMessage(websocket.TextMessage, payload) + + err = s.rooms[roomName].AddStreamer(conn) + if err != nil { + log.Printf("Failed to add streamer: %s", err) + } + s.rooms[roomName].Start() // Blocking call + } } // a > b => 1 diff --git a/tstream/pkg/server/server.go b/tstream/pkg/server/server.go index e69dd75..564e8a3 100644 --- a/tstream/pkg/server/server.go +++ b/tstream/pkg/server/server.go @@ -84,7 +84,6 @@ func (s *Server) Start() { // Add room router.HandleFunc("/api/room", s.handleAddRoom).Queries("streamerID", "{streamerID}", "title", "{title}").Methods("POST", "OPTIONS") router.HandleFunc("/ws/{roomName}/streamer", s.handleWSStreamer) // for streamers - router.HandleFunc("/ws/{roomName}/schat", s.handleWSSChat) // chat for streamers router.HandleFunc("/ws/{roomName}/viewer", s.handleWSViewer) // for viewers handler := cors.Default().Handler(router) @@ -96,7 +95,8 @@ func (s *Server) Start() { go s.repeatedlySyncDB(cfg.SERVER_SYNCDB_INTERVAL) if err := s.server.ListenAndServe(); err != nil { // blocking call - log.Panicf("Faield to start server: %s", err) + log.Panicf("Failed to start server: %s", err) + return } } @@ -119,7 +119,6 @@ func (s *Server) repeatedlyCleanRooms(interval, idleThreshold int) { } } -// TODO : clean inside the DB as well, DB should only store room that are STopped // Clean in active rooms or stopped one func (s *Server) scanAndCleanRooms(idleThreshold int) int { threshold := time.Duration(idleThreshold) * time.Second diff --git a/tstream/pkg/streamer/chat.go b/tstream/pkg/streamer/chat.go index 3745e7d..32d9bd3 100644 --- a/tstream/pkg/streamer/chat.go +++ b/tstream/pkg/streamer/chat.go @@ -2,9 +2,13 @@ package streamer import ( + "fmt" "github.com/gdamore/tcell/v2" "github.com/gorilla/websocket" "github.com/rivo/tview" + "log" + "net/url" + "strings" ) type Chat struct { @@ -27,39 +31,62 @@ func NewChat(sessionId, serverAddr, username string) *Chat { } func (c *Chat) Start() error { - c.InitUI() + c.initUI() if err := c.app.EnableMouse(true).Run(); err != nil { panic(err) } + + c.connectWS() return nil } -func (c *Chat) InitUI() error { +func (c *Chat) initUI() error { layout := tview.NewGrid(). SetRows(3, 0, 1). SetColumns(0). SetBorders(true) - newPrimitive := func(text string) tview.Primitive { - return tview.NewTextView(). - SetTextAlign(tview.AlignCenter). - SetText(text) - } - menu := newPrimitive("Menu") + tstreamText := tview.NewTextView(). + SetText("TStream"). + SetTextAlign(tview.AlignCenter) + + titleText := tview.NewTextView(). + SetText("Title") + + usernameText := tview.NewTextView(). + SetText("Username") + + nviewersText := tview.NewTextView(). + SetTextAlign(tview.AlignRight). + SetText("👤 10") + + uptimeText := tview.NewTextView(). + SetTextAlign(tview.AlignRight). + SetText("00:30:31") + + header := tview.NewGrid(). + SetRows(1, 0, 1). + SetColumns(0, 0, 0). + AddItem(tstreamText, 0, 0, 1, 3, 0, 0, false). + AddItem(titleText, 1, 0, 2, 2, 0, 0, false). + AddItem(usernameText, 2, 0, 1, 1, 0, 0, false). + AddItem(nviewersText, 1, 2, 1, 1, 0, 0, false). + AddItem(uptimeText, 2, 2, 1, 1, 0, 0, false) + ChatTextView := tview.NewTextView(). SetScrollable(true). SetDynamicColors(true). SetWordWrap(true).SetText("a\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\na\nb\nc\nd\ne\nf\ng\nh\n"). ScrollToEnd() - //sideBar := newPrimitive("Side Bar") + messageInput := tview.NewInputField() messageInput.SetLabel("[red]>[red] "). SetDoneFunc(func(key tcell.Key) { messageInput.SetText("") }) - layout.AddItem(menu, 0, 0, 1, 1, 0, 0, false). + layout.AddItem(header, 0, 0, 1, 1, 0, 0, false). AddItem(ChatTextView, 1, 0, 1, 1, 0, 0, false). AddItem(messageInput, 2, 0, 1, 1, 0, 0, true) @@ -67,17 +94,20 @@ func (c *Chat) InitUI() error { return nil } -//func main() { -// app := tview.NewApplication() -// inputField := tview.NewInputField(). -// SetLabel("Enter a number: "). -// SetPlaceholder("E.g. 1234"). -// SetFieldWidth(10). -// SetAcceptanceFunc(tview.InputFieldInteger). -// SetDoneFunc(func(key tcell.Key) { -// app.Stop() -// }) -// if err := app.SetRoot(inputField, true).EnableMouse(true).Run(); err != nil { -// panic(err) -// } -//} +func (c *Chat) connectWS() error { + scheme := "wss" + if strings.HasPrefix(c.serverAddr, "http://") { + scheme = "ws" + } + + host := strings.Replace(strings.Replace(c.serverAddr, "http://", "", 1), "https://", "", 1) + url := url.URL{Scheme: scheme, Host: host, Path: fmt.Sprintf("/ws/%s/chat", c.username)} + log.Printf("Openning socket at %s", url.String()) + + conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil) + if err != nil { + return fmt.Errorf("Failed to connected to websocket: %s", err) + } + c.conn = conn + return nil +} diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 84877c0..f0e7879 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -8,11 +8,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - ptyDevice "github.com/creack/pty" - "github.com/gorilla/websocket" - "github.com/qnkhuat/tstream/internal/cfg" - "github.com/qnkhuat/tstream/pkg/message" - "github.com/qnkhuat/tstream/pkg/ptyMaster" "io" "log" "net/http" @@ -20,6 +15,12 @@ import ( "os" "strings" "time" + + ptyDevice "github.com/creack/pty" + "github.com/gorilla/websocket" + "github.com/qnkhuat/tstream/internal/cfg" + "github.com/qnkhuat/tstream/pkg/message" + "github.com/qnkhuat/tstream/pkg/ptyMaster" ) // TODO: if we supports windows this should be changed @@ -71,7 +72,9 @@ func (s *Streamer) Start() error { err := s.ConnectWS() if err != nil { log.Println(err) - s.Stop("Failed to connect to server") + fmt.Println(err.Error()) + s.Stop(err.Error()) + return err } fmt.Printf("🔥 Streaming at: %s/%s\n", s.clientAddr, s.username) @@ -87,15 +90,6 @@ func (s *Streamer) Start() error { s.Winsize(ws.Rows, ws.Cols) }) - // Send room title - msg, err := message.Wrap(message.TStreamerConnect, &message.StreamerConnect{Title: s.title}) - if err == nil { - payload, _ := json.Marshal(msg) - s.conn.WriteMessage(websocket.TextMessage, payload) - } else { - log.Printf("Failed to wrap connect message: %s", err) - } - // Pipe command response to Pty and server go func() { mw := io.MultiWriter(os.Stdout, s) @@ -144,7 +138,9 @@ func (s *Streamer) Start() error { // Current for ping message only // TODO: secure this, otherwise server can control streamer terminal go func() { - _, _, err := s.conn.ReadMessage() + _, msg, _ := s.conn.ReadMessage() + wrappedMsg, _ := message.Unwrap(msg) + log.Printf("Not implemented response for message: %s", wrappedMsg.Type) if err != nil { log.Printf("Failed to receive message from server: %s", err) } @@ -167,7 +163,6 @@ func (s *Streamer) Start() error { } func (s *Streamer) RequestAddRoom() int { - // TODO: handle cases when call add api return existed body := map[string]string{"secret": s.secret} jsonValue, _ := json.Marshal(body) payload := bytes.NewBuffer(jsonValue) @@ -181,6 +176,9 @@ func (s *Streamer) RequestAddRoom() int { return resp.StatusCode } +// When connect is initlialized, streamer send a client info to server +// Then wait for a confirmation from server for whether or not this connection +// is authorized func (s *Streamer) ConnectWS() error { scheme := "wss" if strings.HasPrefix(s.serverAddr, "http://") { @@ -192,24 +190,56 @@ func (s *Streamer) ConnectWS() error { log.Printf("Openning socket at %s", url.String()) conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil) + s.conn = conn if err != nil { - return fmt.Errorf("Failed to connected to websocket: %s", err) + return fmt.Errorf("Failed to connect to server") } - // Handle server ping + //Handle server ping conn.SetPingHandler(func(appData string) error { return s.conn.WriteControl(websocket.PongMessage, emptyByteArray, time.Time{}) }) - s.conn = conn + // Handle server ping + conn.SetCloseHandler(func(code int, text string) error { + s.Stop("Closed connection by server") + return nil + }) + + // send client info so server can verify + clientInfo := message.ClientInfo{ + Name: s.username, + Role: message.RStreamer, + Secret: s.secret, + } + + msg, _ := message.Wrap(message.TClientInfo, clientInfo) + payload, _ := json.Marshal(msg) + err = conn.WriteMessage(websocket.TextMessage, payload) + if err != nil { + return fmt.Errorf("Failed to connect to server") + } + + // Verify server's response + _, resp, err := conn.ReadMessage() + wrappedMsg, err := message.Unwrap(resp) + log.Printf("Got a message: %s", wrappedMsg) + if wrappedMsg.Type == message.TStreamerUnauthorized { + return fmt.Errorf("Unauthorized connection") + } else if wrappedMsg.Type != message.TStreamerAuthorized { + return fmt.Errorf("Expect connect confirmation from server") + } + return nil } func (s *Streamer) Stop(msg string) { s.conn.WriteControl(websocket.CloseMessage, emptyByteArray, time.Time{}) s.conn.Close() - s.pty.Stop() - s.pty.Restore() + if s.pty != nil { + s.pty.Stop() + s.pty.Restore() + } fmt.Println() fmt.Println(msg) } @@ -246,7 +276,6 @@ func GetSecret(configPath string) string { // gen a new one if not existed if err != nil { cfg = NewCfg() - secret = GenSecret("tstream") cfg.Secret = GenSecret("tstream") WriteCfg(CONFIG_PATH, cfg) } else {