Skip to content

Commit

Permalink
Merge pull request #228 from twitchdev/fix-226
Browse files Browse the repository at this point in the history
Fix 226 and 227
  • Loading branch information
Xemdo authored Apr 11, 2023
2 parents d10cf02 + 403aa18 commit 128f0ca
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 39 deletions.
5 changes: 5 additions & 0 deletions internal/events/websocket/mock_server/close_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ type CloseMessage struct {
}

var (
closeClientDisconnected = &CloseMessage{
code: 1000,
message: "client disconnected",
}

closeInternalServerError = &CloseMessage{
code: 4000,
message: "internal server error",
Expand Down
56 changes: 31 additions & 25 deletions internal/events/websocket/mock_server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import (

type ServerManager struct {
serverList *util.List[WebSocketServer]
reconnectTesting bool
primaryServer string
ip string
port int
debugEnabled bool
strictMode bool
sslEnabled bool
protocolHttp string
protocolWs string
reconnectTesting bool // Indicates if the server is in the process of running a simulation server reconnect/restart
primaryServer string // The current primary server by its ID. This should be in serverList
ip string // IP the server will bind to
port int // Port the server will bind to
debugEnabled bool // Indicates if the server was started with --debug
strictMode bool // Indicates if the server was started with --require-subscriptions
sslEnabled bool // Indicates if the server was started with --ssl
protocolHttp string // String for the HTTP protocol URIs (http or https)
protocolWs string // String for the WS protocol URIs (ws or wss)
}

var serverManager *ServerManager
Expand Down Expand Up @@ -269,11 +269,13 @@ func subscriptionPageHandlerGet(w http.ResponseWriter, r *http.Request) {
Status: subscription.Status,
Type: subscription.Type,
Version: subscription.Version,
Condition: EmptyStruct{},
Condition: subscription.Conditions,
CreatedAt: subscription.CreatedAt,
Transport: SubscriptionTransport{
Method: "websocket",
SessionID: fmt.Sprintf("%v_%v", server.ServerId, clientName),
Method: "websocket",
SessionID: fmt.Sprintf("%v_%v", server.ServerId, clientName),
ConnectedAt: subscription.ClientConnectedAt,
DisconnectedAt: subscription.ClientDisconnectedAt,
},
Cost: 0,
})
Expand Down Expand Up @@ -378,6 +380,8 @@ func subscriptionPageHandlerPost(w http.ResponseWriter, r *http.Request) {
CreatedAt: time.Now().UTC().Format(time.RFC3339Nano),
Status: STATUS_ENABLED, // https://dev.twitch.tv/docs/api/reference/#get-eventsub-subscriptions
SessionClientName: clientName,
Conditions: body.Condition,
ClientConnectedAt: client.ConnectedAtTimestamp,
}

var subs []Subscription
Expand All @@ -397,19 +401,21 @@ func subscriptionPageHandlerPost(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)

json.NewEncoder(w).Encode(&SubscriptionPostSuccessResponse{
Body: SubscriptionPostSuccessResponseBody{
ID: subscription.SubscriptionID,
Status: subscription.Status,
Type: subscription.Type,
Version: subscription.Version,
Condition: EmptyStruct{},
CreatedAt: subscription.CreatedAt,
Transport: SubscriptionTransport{
Method: "websocket",
SessionID: fmt.Sprintf("%v_%v", server.ServerId, clientName),
ConnectedAt: client.ConnectedAtTimestamp,
Data: []SubscriptionPostSuccessResponseBody{
{
ID: subscription.SubscriptionID,
Status: subscription.Status,
Type: subscription.Type,
Version: subscription.Version,
Condition: subscription.Conditions,
CreatedAt: subscription.CreatedAt,
Transport: SubscriptionTransport{
Method: "websocket",
SessionID: fmt.Sprintf("%v_%v", server.ServerId, clientName),
ConnectedAt: client.ConnectedAtTimestamp,
},
Cost: 0,
},
Cost: 0,
},
Total: 0,
MaxTotalCost: 10,
Expand Down Expand Up @@ -519,7 +525,7 @@ func handlerResponseErrorUnauthorized(w http.ResponseWriter, message string) {
func handlerResponseErrorConflict(w http.ResponseWriter, message string) {
w.WriteHeader(http.StatusConflict)
bytes, _ := json.Marshal(&SubscriptionPostErrorResponse{
Error: "Unauthorized",
Error: "Conflict",
Message: message,
Status: 409,
})
Expand Down
12 changes: 7 additions & 5 deletions internal/events/websocket/mock_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func (ws *WebSocketServer) WsPageHandler(w http.ResponseWriter, r *http.Request)
log.Printf("read err [%v]: %v", client.clientName, err)

ws.muClients.Lock()
client.CloseWithReason(closeNetworkError)
ws.handleClientConnectionClose(client, closeNetworkError)
client.CloseWithReason(closeClientDisconnected)
ws.handleClientConnectionClose(client, closeClientDisconnected)
ws.muClients.Unlock()
break
}
Expand Down Expand Up @@ -488,9 +488,11 @@ func (ws *WebSocketServer) handleClientConnectionClose(client *Client, closeReas
if ws.Status == 2 {
ws.muSubscriptions.Lock()
subscriptions := ws.Subscriptions[client.clientName]
for _, subscription := range subscriptions {
if subscription.Status == STATUS_ENABLED {
subscription.Status = getStatusFromCloseMessage(closeReason)
for i := range subscriptions {
if subscriptions[i].Status == STATUS_ENABLED {
subscriptions[i].Status = getStatusFromCloseMessage(closeReason)
subscriptions[i].ClientConnectedAt = ""
subscriptions[i].ClientDisconnectedAt = time.Now().UTC().Format(time.RFC3339Nano)
}
}
ws.Subscriptions[client.clientName] = subscriptions
Expand Down
28 changes: 19 additions & 9 deletions internal/events/websocket/mock_server/subscription.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package mock_server

import "github.com/twitchdev/twitch-cli/internal/models"

type Subscription struct {
SubscriptionID string // Random GUID for the subscription
ClientID string // Client ID included in headers
Expand All @@ -8,14 +10,19 @@ type Subscription struct {
CreatedAt string // Timestamp of when the subscription was created
Status string // Status of the subscription
SessionClientName string // Client name of the session this is associated with.

ClientConnectedAt string // Time client connected
ClientDisconnectedAt string // Time client disconnected

Conditions models.EventsubCondition // Values of the subscription's condition object
}

// Request - POST /eventsub/subscriptions
type SubscriptionPostRequest struct {
Type string `json:"type"`
Version string `json:"version"`
Condition interface{} `json:"condition"`
Type string `json:"type"`
Version string `json:"version"`

Condition models.EventsubCondition `json:"condition"`
Transport SubscriptionPostRequestTransport `json:"transport"`
}

Expand All @@ -27,7 +34,7 @@ type SubscriptionPostRequestTransport struct {

// Response (Success) - POST /eventsub/subscriptions
type SubscriptionPostSuccessResponse struct {
Body SubscriptionPostSuccessResponseBody `json:"body"`
Data []SubscriptionPostSuccessResponseBody `json:"data"`

Total int `json:"total"`
MaxTotalCost int `json:"max_total_cost"`
Expand All @@ -44,8 +51,8 @@ type SubscriptionPostSuccessResponseBody struct {
CreatedAt string `json:"created_at"`
Cost int `json:"cost"`

Condition EmptyStruct `json:"condition"`
Transport SubscriptionTransport `json:"transport"`
Condition models.EventsubCondition `json:"condition"`
Transport SubscriptionTransport `json:"transport"`
}

// Response (Error) - POST /eventsub/subscriptions
Expand All @@ -67,9 +74,10 @@ type SubscriptionGetSuccessResponse struct {

// Cross-usage
type SubscriptionTransport struct {
Method string `json:"method"`
SessionID string `json:"session_id"`
ConnectedAt string `json:"connected_at"`
Method string `json:"method"`
SessionID string `json:"session_id"`
ConnectedAt string `json:"connected_at,omitempty"`
DisconnectedAt string `json:"disconnected_at,omitempty"`
}

// Cross-usage
Expand Down Expand Up @@ -112,6 +120,8 @@ func getStatusFromCloseMessage(reason *CloseMessage) string {
code := reason.code

switch code {
case 1000:
return STATUS_WEBSOCKET_DISCONNECTED
case 4000:
return STATUS_INTERNAL_ERROR
case 4001:
Expand Down

0 comments on commit 128f0ca

Please # to comment.