From 62e2691db96049b3dfe5e0ee46d0ad8332944ed2 Mon Sep 17 00:00:00 2001 From: Earther Date: Fri, 23 Jul 2021 18:34:31 +0700 Subject: [PATCH] Add toggle voicce server --- tstream/pkg/room/room.go | 2 - tstream/pkg/room/sfu.go | 1 - tstream/pkg/server/server.go | 10 +-- tstream/pkg/streamer/chat.go | 162 ++++++++++++++++++++++++----------- 4 files changed, 116 insertions(+), 59 deletions(-) diff --git a/tstream/pkg/room/room.go b/tstream/pkg/room/room.go index 5021f18..6b34cd4 100644 --- a/tstream/pkg/room/room.go +++ b/tstream/pkg/room/room.go @@ -385,14 +385,12 @@ func (r *Room) Broadcast(msg message.Wrapper, roles []message.CRole, IDExclude [ func (r *Room) Stop(status message.RoomStatus) { log.Printf("Stopping room: %s, with Status: %s", r.name, status) r.status = status - r.lock.Lock() for id, client := range r.clients { client.Close() r.RemoveClient(id) } r.sfu.Stop() r.streamer.Close() - r.lock.Unlock() } func (r *Room) PrepareRoomInfo() message.RoomInfo { diff --git a/tstream/pkg/room/sfu.go b/tstream/pkg/room/sfu.go index a005d84..411b9c1 100644 --- a/tstream/pkg/room/sfu.go +++ b/tstream/pkg/room/sfu.go @@ -149,7 +149,6 @@ func (s *SFU) AddPeer(cl *Client) error { return } } - log.Printf("The enddddddddddd") }) } diff --git a/tstream/pkg/server/server.go b/tstream/pkg/server/server.go index 0ab6c08..a14a57a 100644 --- a/tstream/pkg/server/server.go +++ b/tstream/pkg/server/server.go @@ -109,13 +109,9 @@ func (s *Server) Stop() { // interval : scan for every interval time // ildeThreshold : room with idle time above this threshold will be killed func (s *Server) repeatedlyCleanRooms(interval, idleThreshold int) { - tick := time.NewTicker(time.Duration(interval) * time.Second) - for { - select { - case <-tick.C: - c := s.scanAndCleanRooms(idleThreshold) - log.Printf("Cleaned %d rooms", c) - } + for _ = range time.Tick(time.Duration(interval) * time.Second) { + c := s.scanAndCleanRooms(idleThreshold) + log.Printf("Cleaned %d rooms", c) } } diff --git a/tstream/pkg/streamer/chat.go b/tstream/pkg/streamer/chat.go index 8b83528..afbdc43 100644 --- a/tstream/pkg/streamer/chat.go +++ b/tstream/pkg/streamer/chat.go @@ -9,8 +9,9 @@ import ( "github.com/gorilla/websocket" "github.com/pion/webrtc/v3" "github.com/qnkhuat/mediadevices" - "github.com/qnkhuat/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder + "github.com/qnkhuat/mediadevices/pkg/codec/opus" _ "github.com/qnkhuat/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter + "github.com/qnkhuat/mediadevices/pkg/prop" "github.com/qnkhuat/tstream/pkg/message" "github.com/rivo/tview" "log" @@ -22,6 +23,13 @@ import ( var decoder = schema.NewDecoder() +const mtu int = 1600 + +type MediaSession struct { + stream mediadevices.MediaStream + engine *webrtc.MediaEngine +} + type Chat struct { username string sessionId string @@ -29,6 +37,7 @@ type Chat struct { color string wsConn *websocket.Conn // for chat and roominfo peerConn *webrtc.PeerConnection // for voice + mediaSession *MediaSession app *tview.Application startedTime time.Time chatTextView *tview.TextView @@ -37,6 +46,8 @@ type Chat struct { titleTextView *tview.TextView muteBtn *tview.Button mute bool + + lastToggleMute time.Time } func NewChat(sessionId, serverAddr, username string) *Chat { @@ -58,11 +69,6 @@ func (c *Chat) Start() error { return err } - if err := c.StartVoiceService(); err != nil { - log.Printf("Failed to start voice service : %s", err) - c.addNoti(fmt.Sprintf("Failed to start voice service : %s", err)) - } - // blocking call if err := c.app.EnableMouse(true).Run(); err != nil { log.Printf("Error in UI app: %s", err) @@ -129,6 +135,7 @@ func (c *Chat) StartChatService() error { go func() { time.Sleep(1 * time.Second) c.addNoti("[yellow]Type /help to get list of available commands[white]") + c.addNoti("[yellow]Voice chat is off. Type /unmute to turn on voice chat[white]") }() go func() { @@ -162,46 +169,71 @@ func (c *Chat) StartChatService() error { return nil } -func (c *Chat) StartVoiceService() error { - config := webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{{ - URLs: []string{"stun:stun.l.google.com:19302"}}, - }, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } - +func GetMediaSession() (*MediaSession, error) { // Init microphone - mediaEngine := webrtc.MediaEngine{} + var mediaSession *MediaSession + engine := &webrtc.MediaEngine{} opusParams, err := opus.NewParams() if err != nil { - return err + return mediaSession, err } codecSelector := mediadevices.NewCodecSelector( mediadevices.WithAudioEncoders(&opusParams), ) - codecSelector.Populate(&mediaEngine) + codecSelector.Populate(engine) - api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine)) - peerConn, err := api.NewPeerConnection(config) - if err != nil { - log.Printf("Failed to start webrtc connection %s", err) - return err - } - - s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ - Audio: func(c *mediadevices.MediaTrackConstraints) {}, + stream, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ + Audio: func(c *mediadevices.MediaTrackConstraints) { + c.SampleRate = prop.Int(44100) + c.Latency = prop.Duration(0) + }, Codec: codecSelector, }) if err != nil { log.Printf("Failed to get user media %s", err) + return mediaSession, err + } + + mediaSession = &MediaSession{ + engine: engine, + stream: stream, + } + return mediaSession, nil +} + +func (c *Chat) StartVoiceService() error { + // media engine can only created once during the process + // in order to turn on/off audio track we init it once and use many times + if c.mediaSession == nil { + mediaSession, err := GetMediaSession() + if err != nil { + log.Printf("Failed to open media engine: %s", err) + return err + } + c.mediaSession = mediaSession + } + + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{{ + URLs: []string{"stun:stun.l.google.com:19302"}}, + }, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(c.mediaSession.engine)) + + peerConn, err := api.NewPeerConnection(config) + if err != nil { + log.Printf("Failed to start webrtc connection %s", err) return err } + c.peerConn = peerConn + // add tracks to peer connection - for _, track := range s.GetTracks() { - log.Printf("%s, %s, %s", track.ID(), track.Kind(), track.Kind) - log.Printf("adding track") + for _, track := range c.mediaSession.stream.GetTracks() { + // TODO: we probably want to stop the chat here // reproduce steps: try open a producer page while having chat on track.OnEnded(func(err error) { @@ -229,9 +261,10 @@ func (c *Chat) StartVoiceService() error { switch p { case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected: - c.Stop(fmt.Sprintf("Voice server status: %s", p)) + wsConn.Close() case webrtc.PeerConnectionStateConnected: + c.addNoti("[yellow]Voice server connected[white]") default: log.Printf("Not implemented: %s", p) @@ -269,7 +302,6 @@ func (c *Chat) StartVoiceService() error { err := wsConn.ReadJSON(&msg) if err != nil { log.Printf("Failed to read message: %s", err) - c.Stop("Failed to read message form server") return } @@ -345,6 +377,28 @@ func (c *Chat) StartVoiceService() error { } +func (c *Chat) StopVoiceServer() error { + if c.peerConn == nil { + return nil + } + log.Printf("Stoppign") + + for _, sender := range c.peerConn.GetSenders() { + log.Printf("removed: %v", sender) + if err := sender.Stop(); err != nil { + log.Printf("Failed to stop voice: %s", err) + } + c.peerConn.RemoveTrack(sender) + } + if err := c.peerConn.Close(); err != nil { + log.Printf("Failed to stop peerconenction: %s", err) + return err + } + log.Printf("stopped") + c.peerConn = nil + return nil +} + func (c *Chat) requestServer(msgType message.MType) error { payload := message.Wrapper{ Type: msgType, @@ -424,7 +478,7 @@ func (c *Chat) initUI() error { // Default is mute c.muteBtn = tview.NewButton("🔇"). SetSelectedFunc(func() { - c.toggleMute() + c.toggleMute(!c.mute) }) c.muteBtn.SetBackgroundColor(tcell.ColorBlack) @@ -454,12 +508,12 @@ func (c *Chat) HandleCommand(command string) error { args := strings.Split(command, " ") switch args[0] { case "help": - c.addNoti(` - TStream - Streaming from terimnal + c.addNoti(`TStream - Streaming from terimnal [green]/title[yellow] title[white] - to change stream title [green]/mute[white] - to turn on microphone [green]/unmute[white] - to turn off microphone - [green]/exit[white] - to exit chat room`) + [green]/exit[white] - to exit chat room + `) case "title": if len(args) > 1 { @@ -480,13 +534,10 @@ func (c *Chat) HandleCommand(command string) error { } case "mute": - if !c.mute { - c.toggleMute() - } + c.toggleMute(true) + case "unmute": - if c.mute { - c.toggleMute() - } + c.toggleMute(false) case "exit": c.Stop("Bye!") @@ -518,7 +569,6 @@ func (c *Chat) connectWS(role message.CRole) (*websocket.Conn, error) { Role: role, Secret: GetSecret(CONFIG_PATH), } - log.Printf("clientinfo %s", clientInfo) payload := message.Wrapper{Type: message.TClientInfo, Data: clientInfo} err = conn.WriteJSON(payload) @@ -547,16 +597,30 @@ func (c *Chat) connectWS(role message.CRole) (*websocket.Conn, error) { return conn, nil } -func (c *Chat) toggleMute() { - c.mute = !c.mute - if c.mute { +func (c *Chat) toggleMute(mute bool) { + if mute { + c.StopVoiceServer() c.muteBtn.SetLabel("🔇") - c.addNoti(`[yellow]Microphone: On[white]`) + c.addNoti(`[yellow]Microphone: Off[white]`) } else { + c.addNoti(`[yellow]Connecting to voice server...[white]`) + // BUG + // there is a bug if user toggle audio too fast, it won't able to connect to audio + // A quick fix for this is too make sure user wait enought time before turn it back on + + threshold := 8 * time.Second + time.Sleep(threshold - time.Now().Sub(c.lastToggleMute)) + if err := c.StartVoiceService(); err != nil { + log.Printf("Failed to start voice service : %s", err) + c.addNoti(fmt.Sprintf("Failed to start voice service : %s", err)) + return + } + c.muteBtn.SetLabel("🔈") - c.addNoti(`[yellow]Microphone: Off[white]`) + c.addNoti(`[yellow]Microphone: On[white]`) } - + c.mute = mute + c.lastToggleMute = time.Now() } func (c *Chat) addNoti(msg string) { @@ -591,7 +655,6 @@ func (c *Chat) addChatMsgs(chatList []message.Chat) { } func (c *Chat) Stop(msg string) { - fmt.Printf(msg) if c.wsConn != nil { c.wsConn.Close() } @@ -599,6 +662,7 @@ func (c *Chat) Stop(msg string) { c.peerConn.Close() } c.app.Stop() + fmt.Println(msg) } func FormatChat(name, content, color string) string {