From 51b568f35171f13e6421f06de8bff8fb41021ed7 Mon Sep 17 00:00:00 2001 From: minhnguyen Date: Wed, 15 Mar 2023 21:36:13 +0700 Subject: [PATCH] Update node #1 --- core/broadcast.go | 35 ++++++++++++++ core/client.go | 113 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 core/broadcast.go create mode 100644 core/client.go diff --git a/core/broadcast.go b/core/broadcast.go new file mode 100644 index 0000000000..94605b6116 --- /dev/null +++ b/core/broadcast.go @@ -0,0 +1,35 @@ +package core + +type Broadcast struct { + allClient map[*Client]bool + broadcastMessage chan []byte + registerClient chan *Client + unregisterClient chan *Client +} + +func NewBroadcast() *Broadcast { + return &Broadcast{ + allClient: make(map[*Client]bool), + broadcastMessage: make(chan []byte), + registerClient: make(chan *Client), + unregisterClient: make(chan *Client), + } +} +func (b *Broadcast) Run() { + for { + select { + case newClient := <-b.registerClient: + b.allClient[newClient] = true + case clientData := <-b.unregisterClient: + _, stateClient := b.allClient[clientData] + if stateClient { + delete(b.allClient, clientData) + close(clientData.sendMessage) + } + case messageData := <-b.broadcastMessage: + for clientData := range b.allClient { + clientData.sendMessage <- messageData + } + } + } +} diff --git a/core/client.go b/core/client.go new file mode 100644 index 0000000000..15cec3115b --- /dev/null +++ b/core/client.go @@ -0,0 +1,113 @@ +package core + +import ( + "fmt" + "github.com/gorilla/websocket" + "log" + "net/http" + "time" +) + +const ( + pongWait = 10 * time.Second + pingPeriod = 5 * time.Second + writeDeadline = 10 * time.Second +) + +var upgraderConn = &websocket.Upgrader{} + +type Client struct { + clientBroadcast *Broadcast + websocketConn *websocket.Conn + sendMessage chan []byte +} + +func (c *Client) readPump() { + defer func() { + c.clientBroadcast.unregisterClient <- c + c.websocketConn.Close() + }() + + err := c.websocketConn.SetReadDeadline(time.Now().Add(pongWait)) + if err != nil { + return + } + + c.websocketConn.SetPongHandler(func(string) error { + fmt.Println("Received Pong") + err = c.websocketConn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + _, _, err = c.websocketConn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Println("Error", err) + } + log.Println("Error", err) + break + } + } +} + +func (c *Client) writePump() { + newTicker := time.NewTicker(pingPeriod) + defer func() { + newTicker.Stop() + c.websocketConn.Close() + }() + for { + select { + case messageData, stateData := <-c.sendMessage: + err := c.websocketConn.SetWriteDeadline(time.Now().Add(pingPeriod)) + if err != nil { + return + } + if !stateData { + log.Println("Close the channel") + c.websocketConn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + c.websocketConn.WriteMessage(websocket.BinaryMessage, messageData) + + err = c.websocketConn.SetWriteDeadline(time.Time{}) + if err != nil { + return + } + case <-newTicker.C: + log.Println("Send Ping") + err := c.websocketConn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + return + } + + c.websocketConn.WriteMessage(websocket.PingMessage, nil) + err = c.websocketConn.SetWriteDeadline(time.Time{}) + if err != nil { + return + } + } + } +} + +func serveWs(broadcast *Broadcast, w http.ResponseWriter, r *http.Request) { + websocketConn, err := upgraderConn.Upgrade(w, r, nil) + if err != nil { + return + } + + newClient := &Client{ + clientBroadcast: broadcast, + websocketConn: websocketConn, + sendMessage: make(chan []byte, 256), + } + newClient.clientBroadcast.registerClient <- newClient + go newClient.writePump() + go newClient.readPump() +} + +func messageHandler(broadcast *Broadcast, b []byte) { + broadcast.broadcastMessage <- b +}