Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Websocket ping pong timeout #866

Open
BigBoulard opened this issue Nov 14, 2023 · 13 comments
Open

Websocket ping pong timeout #866

BigBoulard opened this issue Nov 14, 2023 · 13 comments

Comments

@BigBoulard
Copy link

Hi guys,

I'm trying to build a chat app that looks like the below simplified version.

What the chat app is doing:

  • upgrades the connection to a WS connection
  • sets a read limit
  • sets a read deadline
  • sets a pong handler that will print PONG and then reset the read deadline
  • launches 2 goroutines for reading and writing
    • readMessages: reads any incoming message and close the program on any unexpected error
    • writeMessages:
      • reads the egress channel and writes the received payload back to the web socket
      • start a ticker that send a PING control frame to the WS
const (
	pingInterval = 2 * time.Second // interval at which we send a PING test
	pongWait     = 4 * time.Second // PONG timeout: we must receive PONG
)

type WSClient struct {
	wsconn *websocket.Conn
	egress chan []byte // writes comes to this egress chan cause WS conn are not thread-safe
}

type pongHandler func(string) error

func NewWSClient(c *gin.Context) error {
	// create a WSClient instance
	wsClient := &WSClient{
		egress: make(chan []byte),
	}

	// create Web Socket connection
	conn, err := createWS(c.Writer, c.Request, wsClient.pongHandler)
	if err != nil {
		return fmt.Errorf("app.NewWSClient: createWS error: %w", err)
	}
	wsClient.wsconn = conn

	// launch the read/write goroutines
	go wsClient.readMessages()
	go wsClient.writeMessages()

	return nil
}

// create the WS conn and configure
func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
	conn, err := Upgrade(w, r)
	if err != nil {
		return nil, fmt.Errorf("createWS: upgrader.Upgrade error: %w", err)
	}

	conn.SetReadLimit(conf.MessageMaxSize)

	if err := conn.SetReadDeadline(time.Time{}); err != nil {
		return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
	}

	if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
		return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
	}

	conn.SetPongHandler(pongHandler)

	return conn, nil
}

// readMessages is run as a goroutine
func (c *WSClient) readMessages() {
	for {
		messageType, payload, err := c.wsconn.ReadMessage()
		if err != nil {
			log.Fatalf("wsconn.ReadMessage() error: %v", err)
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { // , websocket.CloseAbnormalClosure) {
				log.Fatalf("websocket.IsUnexpectedCloseError: %v", err)
			}
			c.wsconn.Close()
			return
		}
		log.Printf("received %s:\n%s", c.wsMsgTypeToString(messageType), string(payload))
		// writes the message back to the WS
		c.egress <- payload
	}
}

// write messages to the WS
func (c *WSClient) writeMessages() {
	ticker := time.NewTicker(pingInterval)
	defer func() {
		ticker.Stop()
	}()

	for {
		select {
		case data, ok := <-c.egress:
			if !ok {
				// if channel is closed, send a CLOSE signal to the WS
				if err := c.wsconn.WriteMessage(websocket.CloseMessage, nil); err != nil {
					log.Fatalf("egress channel and WS are closed %v", err)
				}
				c.wsconn.Close()
				return
			}
			// write the message to the connection
			if err := c.wsconn.WriteMessage(websocket.TextMessage, data); err != nil {
				log.Fatalf("WS closed: %v", err)
				c.wsconn.Close()
				return
			}

		case <-ticker.C:
			// Send a PING msg to the WS.
			if err := c.wsconn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
				log.Fatalf("Ping attempt but WS closed: %v", err)
				return
			}
		}
	}
}

// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
       log.Print("PONG") // NOT REACHED
	return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}

func (c *WSClient) wsMsgTypeToString(msgType int) string {
	switch msgType {
	case 1:
		return "TEXT"
	case 2:
		return "BINARY"
	case 8:
		return "CLOSE"
	case 9:
		return "PING"
	case 10:
		return "PONG"
	default:
		return "unknown"
	}
}

What the stress-test script is doing

  • creates X rooms of 2 users
  • a WS connection is created for each user
  • a conversation is a sequence like this: user 1 writes, user 1 reads, user 2 writes, user 2 reads ... in an infinite loop
  • a PING handler that is supposed to send a PONG frame to each connection is set
const (
	nbRooms         = 3300 // = 6600 users
	minIdleTimeInMs = 800
	maxIdleTimeInMs = 2000
	throttle        = time.Millisecond * 50
)

func main() {
	configure()
	createChatRooms()
	handleShutdown()
}

func createChatRooms() {
	for i := 0; i < nbRooms; i++ {
		roomID := uuid.New().String()
		user1 := GenUser()
		user2 := GenUser()
		go func(roomID string, u1 User, u2 User) {
			startChat(roomID, u1, u2)
		}(roomID, *user1, *user2)
		time.Sleep(throttle)
	}
}

func startChat(roomID string, u1 User, u2 User) {
	// create users in DB etc.

	// create ws conns
	conn1, err := getWSConn(&u1.User, room.RoomID)
	if err != nil {
		log.Error(err, "getWSConn error")
		return
	}
	conn2, err := getWSConn(&u2.User, room.RoomID)
	if err != nil {
		log.Error(err, "getWSConn error")
		return
	}

	// Start Goroutines for handling messages from user 1 and user 2
	go startConversation(room, u1, conn1, u2, conn2)

	// WARN: connections are never closed intentionally here
}

func startConversation(room *roomdom.Room, u1 User, conn1 *websocket.Conn, u2 User, conn2 *websocket.Conn) {
	defer func() {
		conn1.Close()
		conn2.Close()
	}()
	log.Info(fmt.Sprintf("%s and %s are conversing on %s", u1.User.FirstName, u2.User.FirstName, room.RoomID))
	for {
		// write msg to conn1
		msg1, err := genMsg(u1.User.UID, room.RoomID)
		if err != nil {
			log.Fatal(err, "msg1 genMsg")
			return
		}
		if err := conn1.WriteMessage(websocket.TextMessage, msg1); err != nil {
			log.Fatal(err, "writeMsg1 error")
			return
		}
		time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))

		// read msg from conn1
		messageType, _, err := conn1.ReadMessage()
		if messageType == websocket.PingMessage || messageType == websocket.PongMessage {
			log.Info("startConversation", "received %s", wsMsgTypeToString(messageType)) // I NEVER receive any PONG message here
		}
		// messageType, payload, err := conn.ReadMessage()
		if err != nil {
			log.Fatal(err, "conn.ReadMessages error")
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Fatal(err, "UnexpectedCloseError")
			}
		}

		// write msg to conn2
		msg2, err := genMsg(u2.User.UID, room.RoomID)
		if err != nil {
			log.Error(err, "msg1 genMsg")
			return
		}
		if err := conn2.WriteMessage(websocket.TextMessage, msg2); err != nil {
			log.Error(err, "writeMsg2 error")
			return
		}

		// read msg from conn2
		_, _, err = conn2.ReadMessage()
		// messageType, payload, err := conn.ReadMessage()
		if err != nil {
			log.Fatal(err, "conn.ReadMessages error")
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Fatal(err, "UnexpectedCloseError")
			}
		}

		time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))
	}
}

func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
	conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
        conn.SetPingHandler(func(data string) error {
		println("RECEIVED PING") // OK
		return conn.WriteMessage(websocket.PongMessage, []byte{})
	})
	return conn, err
}

func wsMsgTypeToString(msgType int) string {
	switch msgType {
	case 1:
		return "TEXT"
	case 2:
		return "BINARY"
	case 8:
		return "CLOSE"
	case 9:
		return "PING"
	case 10:
		return "PONG"
	default:
		return "unknown"
	}
}

func handleShutdown() {
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

	<-quit
	log.Info("Shutting down server gracefully...")
}

Problem

The stress-test script receives a PING frame and tries to send a PONG one but I see no incoming PONG frame in the chat app and the connection is then timed out.

wsconn.ReadMessage() error: read tcp 127.0.0.1:8090->127.0.0.1:55896: i/o timeout

Subsidiary questions

  • What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?
  • I'm using WriteMessage to send control frames. I saw that there's also WriteControl that takes a deadline and then calls SetWriteDeadline. I know that unlike WriteMessage, WriteControl is safe to be used concurrently. The thing is my chat app, I only set a read deadline cause the "idleness" of a connection is detected when the user leaves, so from a server-side perspective, when there's no more data to be read. I don't know how am I supposed to use WriteControl in this context.
  • if I remove the ping-pong part of the algorithm, the chat app can handle around 16350 web sockets, then after I get this error:
    error:websocket: close 1006 (abnormal closure): unexpected EOF.
    What can cause this issue cause I don't have much log?

Originally posted by @BigBoulard in gorilla/.github#26

@ghost
Copy link

ghost commented Nov 15, 2023

Control messages like PING are processed when reading a message. Ensure that the client reads all messages.

What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?

NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.

I don't know how am I supposed to use WriteControl in this context.

Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.

Applications should write with a deadline to protect against peers that do not read from the socket.

@BigBoulard
Copy link
Author

BigBoulard commented Nov 16, 2023

Control messages like PING are processed when reading a message. Ensure that the client reads all messages.

What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?

NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.

I don't know how am I supposed to use WriteControl in this context.

Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.

Applications should write with a deadline to protect against peers that do not read from the socket.

Hi @pennystevens,

Thanks for getting back to me to provide clarity on this.

  • The main problem I have is if I remove the ping-pong algorithm, the chat app handle around 16 350 web sockets, then I get this error: error:websocket: close 1006 (abnormal closure): unexpected EOF.
    Is there a means so I can get more information on what's causing the issue?

I'm running the app in a docker desktop container as well as the stress-test script that is replicated cause I can see that each instance breaks after connecting around 7500 web sockets (same on localhost), so I launch several instances... I see no resource issue both CPU or Memory. I think I'm far from the number of available sockets (about 64k I think), so I'm searching on some limits maybe to be setup through the Go Compiler: max number of Goroutines etc..I don't know.

  • the second issue is that the server doesn't see any PONG message that should be sent by the stress test script despite the call to conn.WriteMessage(websocket.PongMessage, []byte{}) when the stress test script tries to create a connection ...
func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
	conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
        conn.SetPingHandler(func(data string) error {
		println("RECEIVED PING") // OK
		return conn.WriteMessage(websocket.PongMessage, []byte{})
	})
	return conn, err
}

... and the creation of the PONG handler in the app:

func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
//  ...
   conn.SetPongHandler(pongHandler)
   return conn, nil
}
// ...
// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
       log.Print("PONG") // NOT REACHED
	return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}

Is there something I'm missing?

@hulkingshtick
Copy link

I noticed some problems with the code in issue.

This code:

	messageType, _, err := conn1.ReadMessage()
	if messageType == websocket.PingMessage || messageType == websocket.PongMessage {
		log.Info("startConversation", "received %s", wsMsgTypeToString(messageType)) // I NEVER receive any PONG message here
	}

will never report a ping or pong message because ReadMessage only returns data messages (text, binary). Control messages are handled by callbacks or, in the case of close, the error return value.

The ping handler:

    conn.SetPingHandler(func(data string) error {
	println("RECEIVED PING") // OK
	return conn.WriteMessage(websocket.PongMessage, []byte{})
})

has three problems.

  • The handler does not echo the data.
  • The handler calls WriteMessage concurrently with other writes.
  • The handler can terminate read on the connection early because it returns all errors from WriteMessage.

Fix the ping handler by starting from the default ping handler code:

    conn.SetPingHandler(func(data string) error {
	println("RECEIVED PING") // <-- your code added here
	// Make a best effort to send the pong message.
	_ = c.WriteControl(PongMessage, []byte(message), time.Now().Add(writeWait))
	return nil
})

The previous comment sets the pong handler to pongHandler. Is pongHandler set to *WSClient.PongHandler or is there more than one pong handling function in the code?

@Oudwins
Copy link

Oudwins commented Jan 8, 2025

I'm having a similar issue but not exactly the same. My pong handler is never called so the connection is closed on keep alive timeout. I am unsure if this is an issue with the lib, my implementation or the server I am connecting to. I have been trying to disable the ping's from my server in case it is the later but with no luck.

// pong handler
	r.serverWebsocket.SetPongHandler(func(data string) error {
		r.serverWebsocket.SetReadDeadline(time.Now().Add(pongWait))
		slog.Debug("Received pong from server", "data", data)
		return nil
	})
// ping handler
	r.serverWebsocket.SetPingHandler(func(data string) error {
		slog.Debug("Received ping from server", "data", data)
		err := r.serverWebsocket.WriteControl(websocket.PongMessage, []byte(data), time.Now().Add(writeWait))
		if err == websocket.ErrCloseSent {
			return nil
		} else if e, ok := err.(net.Error); ok && e.Temporary() {
			return nil
		}
		return err
	})

error I keep getting:

{"time":"2025-01-08T16:54:21.527656997+01:00","level":"DEBUG","msg":"Error reading from Server WebSocket","error":"websocket: close 1011 (internal server error): keepalive ping timeout"}

@hulkingshtick
Copy link

@Oudwins Does your application send pings to the peer?

Your ping handler is unnecessary.

@Oudwins
Copy link

Oudwins commented Jan 8, 2025

@hulkingshtick I'm not manually sending any pings. So I assumed that gorilla is sending them under the hood?

The ping handler (from the godoc comment) triggers when the client sends a ping. I assumed that means that when the server I connect to sends a ping? I copied the default ping handler just to add a log to check if the server I am connecting to sends ping's. It does appear to since I do get those logs

edit: also, thank you. I can't believe you replied so fast, happy new year! and massive thank you

@hulkingshtick
Copy link

The package does not send pings under the hood. The application must send a ping to receive a pong.

The package does respond to pings by sending a pong.

@Oudwins
Copy link

Oudwins commented Jan 8, 2025

Then do you know what could cause me to get this error if I am not sending any pings?

websocket: close 1011 (internal server error): keepalive ping timeout

I'm seriously confused. I keep getting that error approximately every 80 seconds or so.

Edit: im just looking through the logs on my load tests and it is exactly every 81s super weird

@hulkingshtick
Copy link

Does your application contain the text keepalive ping timeout? If so, show that code.

Based on the information shared here, your application must send PINGs. Your application advances the read deadline in response to a received PONG. The peer sends a PONG in response to a received PING. If your application does not send a PINGs, then your application does not receive PONGs. If your application does not receive PONGs, then your application does not advance the read deadline.

@Oudwins
Copy link

Oudwins commented Jan 8, 2025

My app code does not contain the text keepalive anywhere.

The logs contain multiple received ping logs but no received pong logs

To be clear I have some more code that advances the read deadline elsewhere if that is relevant. Because the long handler is never called

@hulkingshtick
Copy link

My app code does not contain the text keepalive anywhere.

If your application does not contain the text keepalive, then the peer application initiated the closing handshake with that error message. The peer application's error message implies that the peer application is waiting for a PING. Perhaps the peer application expects your application to send PINGs, even though that's not required in the WebSockect protocol. Try sending PINGs.

The logs contain multiple received ping logs but no received pong logs

That is expected because:

I'm not manually sending any pings.

The your application must send a PING to receive a PONG.

@Oudwins
Copy link

Oudwins commented Jan 8, 2025

Right, right. I am now sending pings every 5 seconds and receiving pongs back no issue. But the connection still gets closed with the previously mentioned error every 80.5 seconds or so. Is there some way I can verify that this is an error being sent by the peer application?

@ivanjaros
Copy link

you have to implement pong. browser does ping on its own(and it will not show in console). it took me a while to figure out how to do it. because this library is truly "bare bones". but the horrible DX is most likely the effect of the spec rather than library itself. iirc you have to a) enable pong(i think setting pong function to nil should trigger default behavior) and b) you have to have reader and listener running. even if you need only one, otherwise ping/pong mechanism is not handled due to the nature of how this library works(there is only one connection and the library determines message type from frame type, so if you are not listening and writing, the ping/pong frames on both sides will not be processed).

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

4 participants