Skip to content

Commit

Permalink
server add mechanism to verify websocket connection from streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
qnkhuat committed Jul 13, 2021
1 parent 00ee692 commit 25614bc
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 161 deletions.
4 changes: 2 additions & 2 deletions client/src/components/WSTerminal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class WSTerminal extends React.Component<Props, {}> {
this.rescale();
})

this.props.msgManager.pub("request", constants.MSG_TREQUEST_WINSIZE );
this.props.msgManager.pub("request", constants.MSG_TREQUEST_CACHE_MESSAGE );
this.props.msgManager.pub("request", constants.MSG_TREQUEST_WINSIZE);
this.props.msgManager.pub("request", constants.MSG_TREQUEST_CACHE_CONTENT);

window.addEventListener("resize", () => this.rescale());
this.rescale();
Expand Down
2 changes: 1 addition & 1 deletion client/src/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ export const MSG_TREQUEST_WINSIZE = "RequestWinsize";
export const MSG_TCHAT = "Chat";
export const MSG_TREQUEST_CHAT = "RequestChat";
export const COLOR_LIST = ["#1f9c19", "#bd4719", "#d6c613", "#13cfc5", "#144eb3", "#6e10cc", "#db12d5"]
export const MSG_TREQUEST_CACHE_MESSAGE = "RequestCacheMessage";
export const MSG_TREQUEST_CACHE_CONTENT = "RequestCacheContent";
export const MSG_TREQUEST_ROOM_INFO = "RequestRoomInfo";
export const MSG_TREQUEST_CACHE_CHAT = "RequestCacheChat";
5 changes: 3 additions & 2 deletions tstream/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ package main
import (
"flag"
"fmt"
"log"
"os"

"github.com/qnkhuat/tstream/internal/cfg"
"github.com/qnkhuat/tstream/internal/logging"
"github.com/qnkhuat/tstream/pkg/server"
"log"
"os"
)

func main() {
Expand Down
31 changes: 17 additions & 14 deletions tstream/cmd/tstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,18 @@ import (
"bufio"
"flag"
"fmt"
"github.com/manifoldco/promptui"
"github.com/qnkhuat/tstream/internal/cfg"
"github.com/qnkhuat/tstream/internal/logging"
"github.com/qnkhuat/tstream/pkg/streamer"
"log"
"os"
"os/user"
"regexp"

"github.com/manifoldco/promptui"
"github.com/qnkhuat/tstream/internal/cfg"
"github.com/qnkhuat/tstream/internal/logging"
"github.com/qnkhuat/tstream/pkg/streamer"
)

func main() {
// Check if current process is under a tstream session
if len(os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID)) > 0 {
fmt.Printf("This terminal is currently running under session: %s\nType 'exit' to stop the current session!\n", os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID))
os.Exit(1)
}

logging.Config("/tmp/tstream.log", "STREAMER: ")
flag.Usage = func() {
Expand All @@ -55,6 +51,7 @@ func main() {
os.Exit(0)
return
}

validateUsername := func(input string) error {
var validUsername = regexp.MustCompile(`^[a-z][a-z0-9]*[._-]?[a-z0-9]+$`)
if validUsername.MatchString(input) && len(input) > 3 && len(input) < 20 {
Expand All @@ -78,11 +75,11 @@ func main() {
}

var username string
cfg, err := streamer.ReadCfg(streamer.CONFIG_PATH)
config, err := streamer.ReadCfg(streamer.CONFIG_PATH)
if err != nil {
username = u.Username
} else {
username = cfg.Username
username = config.Username
}

promptUsername := promptui.Prompt{
Expand All @@ -98,6 +95,12 @@ func main() {

if !*chat {
// Start Streaming session

// Check if current process is under a tstream session
if len(os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID)) > 0 {
fmt.Printf("This terminal is currently running under session: %s\nType 'exit' to stop the current session!\n", os.Getenv(cfg.STREAMER_ENVKEY_SESSIONID))
os.Exit(1)
}
username, err = promptUsername.Run()
if err != nil {
os.Exit(1)
Expand Down Expand Up @@ -125,7 +128,7 @@ func main() {
os.Exit(1)
}
// Update config before start
cfg.Username = username
config.Username = username
streamer.UpdateCfg(streamer.CONFIG_PATH, "Username", username)

err = s.Start()
Expand All @@ -134,13 +137,13 @@ func main() {
}
} else {
var username = "" // also is sessionID
cfg, err := streamer.ReadCfg(streamer.CONFIG_PATH)
config, err := streamer.ReadCfg(streamer.CONFIG_PATH)

if err != nil {
fmt.Printf("No stream session detected\n")
os.Exit(1)
} else {
username = cfg.Username
username = config.Username
}

if username == "" {
Expand Down
70 changes: 59 additions & 11 deletions tstream/pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package message

import (
"encoding/json"
"fmt"
"time"
)

Expand All @@ -16,11 +17,12 @@ import (
type MType string

const (
TWrite MType = "Write"
TChat MType = "Chat"
TClose MType = "Close"
TError MType = "Error"
TRoomInfo MType = "RoomInfo"
TWrite MType = "Write"
TChat MType = "Chat"
TClose MType = "Close"
TError MType = "Error"
TRoomInfo MType = "RoomInfo"
TClientInfo MType = "ClientInfo"

// When streamer resize their termianl
TWinsize MType = "Winsize"
Expand All @@ -30,14 +32,20 @@ const (

TRequestRoomInfo MType = "RequestRoomInfo"

// When user first connect to server
TStreamerConnect MType = "StreamerConnect"

// when user first join the room, he can request for cached message to avoid idle screen
TRequestCacheMessage = "RequestCacheMessage"
TRequestCacheContent MType = "RequestCacheContent"

// when user first join the room, he can request for cached chat to avoid idle chat screen
TRequestCacheChat MType = "RequestCacheChat"

// Server can request client info to assign roles and verrification
TRequestClientInfo MType = "RequestClientInfo"

// when user first join the room, he can request for cache chat to avoid idle chat screen
TRequestCacheChat = "RequestCacheChat"
// Server will when this message if streamer is verified. Then streamer can proceed to start stream
TStreamerAuthorized MType = "StreamerAuthorized"

// If websocket connection is illegal. server send this message to streamer then close connection
TStreamerUnauthorized MType = "StreamerUnauthorized"
)

type Wrapper struct {
Expand Down Expand Up @@ -84,6 +92,21 @@ type RoomInfo struct {
Status RoomStatus
}

// *** Client ***
type CRole string

const (
RStreamerChat CRole = "StreamerChat" // Chat for streamer
RStreamer CRole = "Streamer" // Send content to server
RViewer CRole = "Viewer" // View content + chat
)

type ClientInfo struct {
Name string
Role CRole
Secret string
}

// *** Helper functions ***

func Unwrap(buff []byte) (Wrapper, error) {
Expand All @@ -92,6 +115,31 @@ func Unwrap(buff []byte) (Wrapper, error) {
return obj, err
}

// Unwrap the wrapper data as well
func Unwrap2(buff []byte) (MType, interface{}, error) {
msg := Wrapper{}
err := json.Unmarshal(buff, &msg)
if err != nil {
return msg.Type, nil, err
}

var msgObj interface{}
switch msg.Type {
case TChat:
msgObj = Chat{}
err = json.Unmarshal(msg.Data, msgObj)

case TRequestClientInfo:
msgObj = ClientInfo{}
err = json.Unmarshal(msg.Data, msgObj)

default:
err = fmt.Errorf("Not implemented")
}

return msg.Type, msgObj, err
}

func Wrap(msgType MType, msgObject interface{}) (Wrapper, error) {

data, err := json.Marshal(msgObject)
Expand Down
33 changes: 21 additions & 12 deletions tstream/pkg/viewer/viewer.go → tstream/pkg/room/client.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package viewer
/*
Generic struct for a websocket connection
Currently used for Viewer and Chat
*/
package room

import (
"github.com/gorilla/websocket"
"github.com/qnkhuat/tstream/pkg/message"
"log"
"time"
)

var emptyByteArray []byte

type Viewer struct {
type Client struct {
conn *websocket.Conn
id string
role message.CRole

// data go in Out channel will be send to user via websocket
Out chan []byte
Expand All @@ -21,30 +24,35 @@ type Viewer struct {
alive bool
}

func New(id string, conn *websocket.Conn) *Viewer {
func NewClient(role message.CRole, conn *websocket.Conn) *Client {
out := make(chan []byte, 256) // buffer 256 send requests
in := make(chan []byte, 256) // buffer 256 send requests
return &Viewer{
return &Client{
conn: conn,
id: id,
Out: out,
In: in,
role: role,
alive: true,
}
}

func (v *Viewer) Alive() bool {
func (v *Client) Role() message.CRole {
return v.role
}

func (v *Client) Alive() bool {
return v.alive
}

func (v *Viewer) Start() {
func (v *Client) Start() {
// Receive message coroutine
go func() {
for {
msg, ok := <-v.Out
if ok {
err := v.conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Printf("Failed to boardcast to %s. Closing connection", v.id)
log.Printf("Failed to boardcast to. Closing connection")
v.Close()
}
} else {
Expand All @@ -53,6 +61,7 @@ func (v *Viewer) Start() {
}
}()

// Send message coroutine
for {
_, msg, err := v.conn.ReadMessage()
if err == nil {
Expand All @@ -65,7 +74,7 @@ func (v *Viewer) Start() {
}
}

func (v *Viewer) Close() {
func (v *Client) Close() {
v.conn.WriteControl(websocket.CloseMessage, emptyByteArray, time.Time{})
v.alive = false
v.conn.Close()
Expand Down
Loading

0 comments on commit 25614bc

Please # to comment.