From 6ec9a3cd937a9eab7e07a6cadd1729fcd27ef854 Mon Sep 17 00:00:00 2001 From: Johannes Christ Date: Thu, 1 Jun 2023 10:22:48 +0200 Subject: [PATCH 1/4] Convert shard sessions into state machines This fixes the previous implementation's problem of retrying connections indefinitely without any proper handling of errors. Ratelimiting will be added separately. --- lib/nostrum/shard/event.ex | 27 ++- lib/nostrum/shard/session.ex | 316 ++++++++++++++++++++++----------- lib/nostrum/struct/ws_state.ex | 10 -- 3 files changed, 229 insertions(+), 124 deletions(-) diff --git a/lib/nostrum/shard/event.ex b/lib/nostrum/shard/event.ex index 7a8b093c7..a9c8f447b 100644 --- a/lib/nostrum/shard/event.ex +++ b/lib/nostrum/shard/event.ex @@ -20,42 +20,41 @@ defmodule Nostrum.Shard.Event do if payload.t == :READY do Logger.info("READY") - %{state | session: payload.d.session_id} + {%{state | session: payload.d.session_id}, []} else - state + {state, []} end end def handle(:heartbeat, _payload, state) do Logger.debug("HEARTBEAT PING") - {state, Payload.heartbeat_payload(state.seq)} + {{state, Payload.heartbeat_payload(state.seq)}, []} end def handle(:heartbeat_ack, _payload, state) do Logger.debug("HEARTBEAT_ACK") - %{state | last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true} + {%{state | last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}, []} end - def handle(:hello, payload, state) do - state = %{ - state - | heartbeat_interval: payload.d.heartbeat_interval - } - - GenServer.cast(state.conn_pid, :heartbeat) + def handle(:hello, payload, old_state) do + state = Map.put(old_state, :heartbeat_interval, payload.d.heartbeat_interval) + # Jitter it as documented. But only for Hello - the subsequent timeouts + # must not jitter it anymore, but send out at this interval. + heartbeat_next = :rand.uniform(state.heartbeat_interval) + heartbeat_action = {:state_timeout, heartbeat_next, :send_heartbeat} if session_exists?(state) do Logger.info("RESUMING") - {state, Payload.resume_payload(state)} + {{state, Payload.resume_payload(state)}, heartbeat_action} else Logger.info("IDENTIFYING") - {state, Payload.identity_payload(state)} + {{state, Payload.identity_payload(state)}, heartbeat_action} end end def handle(:invalid_session, _payload, state) do Logger.info("INVALID_SESSION") - {state, Payload.identity_payload(state)} + {{state, Payload.identity_payload(state)}, []} end def handle(:reconnect, _payload, state) do diff --git a/lib/nostrum/shard/session.ex b/lib/nostrum/shard/session.ex index c259e4c91..85154aa1c 100644 --- a/lib/nostrum/shard/session.ex +++ b/lib/nostrum/shard/session.ex @@ -1,5 +1,46 @@ defmodule Nostrum.Shard.Session do - @moduledoc false + @moduledoc """ + Manages a single shard's gateway connection. + + + ## Purpose + + Discord's gateway sends us events over websocket. The shard session state + machine concerns it self with parsing these events and dispatching them to + clients as appropriate. + + + > ### Internal module {: .info} + > + > This module is intended for exclusive usage inside of nostrum, and is + > documented for completeness and people curious to look behind the covers. + + + ## Inner workings + + The session is implemented via `:gen_statem` and can be in one of the + following states: + + - `disconnected`: when no connection is up at all. On initial connection of + the session (e.g. no `seq` field is available), this will block if we need to + wait a moment to respect the session startup concurrency limits. Afterwards, + it will transition to `connecting_http`. + + - `connecting_http`: We are setting up a HTTP connection to the API. This + means that no connection was available previously at all, and we need to open + it from scratch. Once `:gun` notifies us that the connection is up, we + transition to the `connecting_ws` state. + + - `connecting_ws`: We are turning the HTTP connection into a WebSocket + connection. This is used both for the initial connection and also for later + gateway-requested reconnections. If this succeeds, we head into the + `connected` state. + + - `connected`: The WebSocket connection is up. This state actively deals with + new data from the gateway, and takes care of heartbeating. If Discord fails + to respond to our heartbeats, we close down the full connection and attempt + to re-establish and resume events. + """ alias Nostrum.{Constants, Util} alias Nostrum.Shard.{Connector, Event, Payload} @@ -7,14 +48,15 @@ defmodule Nostrum.Shard.Session do require Logger - use GenServer + @behaviour :gen_statem + # Query string to connect to when upgrading the connection. @gateway_qs "/?compress=zlib-stream&encoding=etf&v=10" - # Maximum time the initial connection may take, in milliseconds. - @timeout_connect 10_000 - # Maximum time the websocket upgrade may take, in milliseconds. - @timeout_ws_upgrade 10_000 + # Maximum time the initial connection may take. + @timeout_connect :timer.seconds(5) + # Maximum time the websocket upgrade may take. + @timeout_ws_upgrade :timer.seconds(5) # Messages to buffer at a time. Decremented by :gun by 1 for every message we # receive. If this reaches zero, `:gun` will stop reading events from # upstream. Equivalent to setting `{active, false}` on the socket at `0`. @@ -31,153 +73,227 @@ defmodule Nostrum.Shard.Session do end payload = Payload.status_update_payload(idle_since, game, stream, status, afk, type) - GenServer.cast(pid, {:status_update, payload}) + :gen_statem.cast(pid, {:status_update, payload}) end def update_voice_state(pid, guild_id, channel_id, self_mute, self_deaf) do payload = Payload.update_voice_state_payload(guild_id, channel_id, self_mute, self_deaf) - GenServer.cast(pid, {:update_voice_state, payload}) + :gen_statem.cast(pid, {:update_voice_state, payload}) end def request_guild_members(pid, guild_id, limit \\ 0) do payload = Payload.request_members_payload(guild_id, limit) - GenServer.cast(pid, {:request_guild_members, payload}) + :gen_statem.cast(pid, {:request_guild_members, payload}) end def get_ws_state(pid) do - GenServer.call(pid, :get_ws_state) + :sys.get_state(pid) end - def start_link([gateway, shard_num, total]) do - GenServer.start_link(__MODULE__, [gateway, shard_num, total], - spawn_opt: [Util.fullsweep_after()] - ) - end + # State machine API - def init([_gateway, _shard_num, _total] = args) do - {:ok, nil, {:continue, args}} + def start_link([_gateway, _shard_num, _total] = shard_opts, statem_opts) do + :gen_statem.start_link(__MODULE__, shard_opts, statem_opts) end - def handle_continue([gateway, shard_num, total_shards], nil) do - Connector.block_until_connect() + def init([gateway, shard_num, total]) do Logger.metadata(shard: shard_num) - gun_opts = %{protocols: [:http], retry: 1_000_000_000, tls_opts: Constants.gun_tls_opts()} - {:ok, worker} = :gun.open(:binary.bin_to_list(gateway), 443, gun_opts) - {:ok, :http} = :gun.await_up(worker, @timeout_connect) - stream = :gun.ws_upgrade(worker, @gateway_qs, [], %{flow: @standard_flow}) - {:upgrade, ["websocket"], _} = :gun.await(worker, stream, @timeout_ws_upgrade) - - zlib_context = :zlib.open() - :zlib.inflateInit(zlib_context) - state = %WSState{ conn_pid: self(), - conn: worker, shard_num: shard_num, - total_shards: total_shards, - stream: stream, - gateway: gateway <> @gateway_qs, - last_heartbeat_ack: DateTime.utc_now(), - heartbeat_ack: true, - zlib_ctx: zlib_context + total_shards: total, + gateway: gateway } - Logger.debug(fn -> "Websocket connection up on worker #{inspect(worker)}" end) + connect = {:next_event, :internal, :connect} + {:ok, :disconnected, state, connect} + end + + def callback_mode, do: [:state_functions, :state_enter] - {:noreply, state} + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts, []]}, + type: :worker, + restart: :permanent, + shutdown: 500 + } end - def handle_info({:gun_ws, _worker, stream, {:binary, frame}}, state) do - payload = - state.zlib_ctx - |> :zlib.inflate(frame) - |> :erlang.iolist_to_binary() - |> :erlang.binary_to_term() + # Ok, we just came here, and we're disconnected. Before we try to connect, + # we need to wait for the shard connector to tell us it's okay to connect. + def disconnected(:enter, _previous_state, %{seq: nil} = data) do + # XXX: Should this be non-blocking? + :ok = Connector.block_until_connect() + {:next_state, :disconnected, data} + end - updated_state = %{state | seq: payload.s || state.seq} + # We've had a connection before. Since we won't identify with IDENTIFY + # but with RESUME, we don't need to respect the concurrency limit, as + # it only applies to identify. + def disconnected(:enter, _previous_state, data) do + {:next_state, :disconnected, data} + end - from_handle = - payload.op - |> Constants.atom_from_opcode() - |> Event.handle(payload, updated_state) + def disconnected(:internal, :connect, data) do + {:next_state, :connecting_http, data} + end + + def disconnected(_kind, _request, _data) do + {:keep_state_and_data, :postpone} + end - :ok = :gun.update_flow(updated_state.conn, stream, @standard_flow) + def connecting_http(:enter, _from, %{gateway: gateway} = data) do + set_timeout = {:state_timeout, @timeout_connect, :connect_timeout} - case from_handle do - {new_state, reply} -> - :ok = :gun.ws_send(updated_state.conn, stream, {:binary, reply}) - {:noreply, new_state} + gun_opts = %{ + retry: 3, + protocols: [:http], + tls_opts: Constants.gun_tls_opts() + } - new_state -> - {:noreply, new_state} - end + {:ok, worker} = :gun.open(:binary.bin_to_list(gateway), 443, gun_opts) + _monitor = :erlang.monitor(:process, worker) + {:keep_state, %{data | conn: worker}, set_timeout} end - def handle_info({:gun_ws, _conn, _stream, :close}, state) do - Logger.info("Shard websocket closed (unknown reason)") - {:noreply, state} + def connecting_http(:info, {:gun_up, conn, :http}, %{conn: conn} = data) do + {:next_state, :connecting_ws, data} end - def handle_info({:gun_ws, _conn, _stream, {:close, errno, reason}}, state) do - Logger.info("Shard websocket closed (errno #{errno}, reason #{inspect(reason)})") - {:noreply, state} + def connecting_http(:state_timeout, :connect_timeout, _data) do + {:stop, :connect_http_timeout} + end + + def connecting_http(_kind, _request, _data) do + {:keep_state_and_data, :postpone} end - def handle_info( - {:gun_down, _conn, _proto, _reason, _killed_streams}, - state + def connecting_ws(:enter, _from, %{conn: conn} = data) do + set_timeout = {:state_timeout, @timeout_ws_upgrade, :upgrade_timeout} + stream = :gun.ws_upgrade(conn, @gateway_qs, [], %{flow: @standard_flow}) + {:keep_state, %{data | stream: stream}, set_timeout} + end + + def connecting_ws( + :info, + {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, + %{zlib_ctx: nil} = data ) do - # Try to cancel the internal timer, but - # do not explode if it was already cancelled. - :timer.cancel(state.heartbeat_ref) - {:noreply, state} + zlib_context = :zlib.open() + :zlib.inflateInit(zlib_context) + + {:next_state, :connected, + %{data | zlib_ctx: zlib_context, last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}} + end + + def connecting_ws( + :info, + {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, + %{zlib_ctx: zlib_ctx} = data + ) do + Logger.info("Re-established websocket connection") + :ok = :zlib.inflateReset(zlib_ctx) + + {:next_state, :connected, + %{data | last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}} end - def handle_info({:gun_up, worker, _proto}, state) do - :ok = :zlib.inflateReset(state.zlib_ctx) - stream = :gun.ws_upgrade(worker, @gateway_qs, [], %{flow: @standard_flow}) - {:upgrade, ["websocket"], _} = :gun.await(worker, stream, @timeout_ws_upgrade) - Logger.warn("Reconnected after connection broke") - {:noreply, %{state | heartbeat_ack: true}} + def connecting_ws(:state_timeout, :upgrade_timeout, _data) do + {:stop, :connect_ws_timeout} end - def handle_cast({:status_update, payload}, state) do - :ok = :gun.ws_send(state.conn, state.stream, {:binary, payload}) - {:noreply, state} + def connecting_ws(_kind, _request, _data) do + {:keep_state_and_data, :postpone} end - def handle_cast({:update_voice_state, payload}, state) do - :ok = :gun.ws_send(state.conn, state.stream, {:binary, payload}) - {:noreply, state} + # We don't need to specially handle resuming here, because Shard.Event will + # adjust our initial payload accordingly. + def connected(:enter, _from, _data) do + :keep_state_and_data end - def handle_cast({:request_guild_members, payload}, state) do - :ok = :gun.ws_send(state.conn, state.stream, {:binary, payload}) - {:noreply, state} + def connected(:info, {:gun_ws, _worker, stream, {:binary, frame}}, data) do + payload = + data.zlib_ctx + |> :zlib.inflate(frame) + |> :erlang.iolist_to_binary() + |> :erlang.binary_to_term() + + data_with_seq = %{data | seq: payload.s || data.seq} + + {from_handle, heartbeat_actions} = + payload.op + |> Constants.atom_from_opcode() + |> Event.handle(payload, data_with_seq) + + :ok = :gun.update_flow(data_with_seq.conn, stream, @standard_flow) + + case from_handle do + {updated_data, reply} -> + :ok = :gun.ws_send(data_with_seq.conn, stream, {:binary, reply}) + {:keep_state, updated_data, heartbeat_actions} + + updated_data -> + {:keep_state, updated_data, heartbeat_actions} + end end - def handle_cast(:heartbeat, %{heartbeat_ack: false, heartbeat_ref: timer_ref} = state) do - Logger.warn("heartbeat_ack not received in time, disconnecting") - {:ok, :cancel} = :timer.cancel(timer_ref) - :gun.ws_send(state.conn, state.stream, :close) - {:noreply, state} + def connected(:info, {:gun_ws, conn, stream, :close}, %{stream: stream} = data) do + Logger.info("Shard websocket closed (unknown reason)") + :gun.flush(conn) + {:next_state, :connecting_ws, %{data | stream: nil}} end - def handle_cast(:heartbeat, state) do - {:ok, ref} = - :timer.apply_after(state.heartbeat_interval, :gen_server, :cast, [ - state.conn_pid, - :heartbeat - ]) + def connected(:info, {:gun_ws, conn, _stream, {:close, errno, reason}}, data) do + Logger.info("Shard websocket closed (errno #{errno}, reason #{inspect(reason)})") + :gun.flush(conn) + {:next_state, :connecting_ws, %{data | stream: nil}} + end - :ok = :gun.ws_send(state.conn, state.stream, {:binary, Payload.heartbeat_payload(state.seq)}) + def connected( + :info, + {:gun_down, conn, _proto, _reason, _killed_streams}, + %{conn: conn} = data + ) do + Logger.info("Lost complete shard connection. Attempting reconnect.") + :gun.flush(conn) + connect = {:next_event, :internal, :connect} + {:next_state, :disconnected, %{data | conn: nil, stream: nil}, connect} + end - {:noreply, - %{state | heartbeat_ref: ref, heartbeat_ack: false, last_heartbeat_send: DateTime.utc_now()}} + def connected(:cast, {request, payload}, %{conn: conn, stream: stream}) + when request in [:status_update, :update_voice_state, :request_guild_members] do + :ok = :gun.ws_send(conn, stream, {:binary, payload}) + :keep_state_and_data end - def handle_call(:get_ws_state, _sender, state) do - {:reply, state, state} + def connected( + :state_timeout, + :send_heartbeat = request, + %{ + conn: conn, + seq: seq, + stream: stream, + heartbeat_ack: heartbeat_ack, + heartbeat_interval: heartbeat_interval + } = data + ) do + if heartbeat_ack do + # Our last heartbeat was acknowledged. Send another one. + :ok = :gun.ws_send(conn, stream, {:binary, Payload.heartbeat_payload(seq)}) + heartbeat_later = {:state_timeout, heartbeat_interval, request} + + {:keep_state, %{data | heartbeat_ack: false, last_heartbeat_send: DateTime.utc_now()}, + heartbeat_later} + else + # Our last heartbeat was not acknowledged. Disconnect and try to resume. + Logger.warn("Heartbeat ack not received in time, reconnecting") + :ok = :gun.ws_send(conn, stream, :close) + connect = {:next_event, :internal, :connect} + {:next_state, :disconnected, %{data | stream: nil}, connect} + end end end diff --git a/lib/nostrum/struct/ws_state.ex b/lib/nostrum/struct/ws_state.ex index 571e12f93..dcc07c3fe 100644 --- a/lib/nostrum/struct/ws_state.ex +++ b/lib/nostrum/struct/ws_state.ex @@ -8,7 +8,6 @@ defmodule Nostrum.Struct.WSState do :total_shards, :seq, :session, - :shard_pid, :conn, :conn_pid, :stream, @@ -17,7 +16,6 @@ defmodule Nostrum.Struct.WSState do :last_heartbeat_ack, :heartbeat_ack, :heartbeat_interval, - :heartbeat_ref, :zlib_ctx ] @@ -39,9 +37,6 @@ defmodule Nostrum.Struct.WSState do @typedoc "The session id" @type session :: integer | nil - @typedoc "PID of the shard containing this state" - @type shard_pid :: pid - @typedoc "PID of the `:gun` worker connected to the websocket" @type conn :: pid @@ -73,9 +68,6 @@ defmodule Nostrum.Struct.WSState do @typedoc "Interval at which heartbeats are sent" @type heartbeat_interval :: pos_integer() | nil - @typedoc "Time ref for the heartbeat" - @type heartbeat_ref :: :timer.tref() | nil - @typedoc "Reference to the current zlib context" @type zlib_ctx :: reference | nil @@ -84,7 +76,6 @@ defmodule Nostrum.Struct.WSState do total_shards: total_shards, seq: seq, session: session, - shard_pid: shard_pid, conn: conn, conn_pid: conn_pid, stream: stream, @@ -93,7 +84,6 @@ defmodule Nostrum.Struct.WSState do last_heartbeat_ack: last_heartbeat_ack, heartbeat_ack: heartbeat_ack, heartbeat_interval: heartbeat_interval, - heartbeat_ref: heartbeat_ref, zlib_ctx: zlib_ctx } end From e5c6964b1277c2185556dbe001b6e6f8c9b17e2b Mon Sep 17 00:00:00 2001 From: Johannes Christ Date: Thu, 1 Jun 2023 12:21:42 +0200 Subject: [PATCH 2/4] Name top-level supervisor --- appup.ex | 6 ++++-- lib/nostrum/application.ex | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/appup.ex b/appup.ex index b0d5306cc..d0a5e450c 100644 --- a/appup.ex +++ b/appup.ex @@ -6,7 +6,9 @@ # Upgrade instructions {'0.9.0-alpha1', [ - {:load_module, Nostrum.Cache.GuildCache} + # Top shard supervisor was not registered, so could not restart shard + # supervisor to load new gateway logic + {:restart_application, :nostrum} ]}, {'0.8.0', [ @@ -17,7 +19,7 @@ # Downgrade instructions {'0.9.0-alpha1', [ - {:load_module, Nostrum.Cache.GuildCache} + {:restart_application, :nostrum} ]}, {'0.8.0', [ diff --git a/lib/nostrum/application.ex b/lib/nostrum/application.ex index c4cd0437c..270bb2ce3 100644 --- a/lib/nostrum/application.ex +++ b/lib/nostrum/application.ex @@ -36,7 +36,7 @@ defmodule Nostrum.Application do if Application.get_env(:nostrum, :dev), do: Supervisor.start_link(children ++ [DummySupervisor], strategy: :one_for_one), - else: Supervisor.start_link(children, strategy: :one_for_one) + else: Supervisor.start_link(children, strategy: :one_for_one, name: Nostrum.Supervisor) end defp check_executables do From 7ebffa9da8a31091c32d7d3203a979f0c5594ffe Mon Sep 17 00:00:00 2001 From: Johannes Christ Date: Fri, 2 Jun 2023 10:52:51 +0200 Subject: [PATCH 3/4] Use the resume_gateway URL for resuming --- lib/nostrum/shard/event.ex | 4 +++- lib/nostrum/shard/session.ex | 7 +++++++ lib/nostrum/struct/ws_state.ex | 6 ++++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/lib/nostrum/shard/event.ex b/lib/nostrum/shard/event.ex index a9c8f447b..3edff41f9 100644 --- a/lib/nostrum/shard/event.ex +++ b/lib/nostrum/shard/event.ex @@ -20,7 +20,9 @@ defmodule Nostrum.Shard.Event do if payload.t == :READY do Logger.info("READY") - {%{state | session: payload.d.session_id}, []} + + {%{state | session: payload.d.session_id, resume_gateway: payload.d.resume_gateway_url}, + []} else {state, []} end diff --git a/lib/nostrum/shard/session.ex b/lib/nostrum/shard/session.ex index 85154aa1c..fe7a13337 100644 --- a/lib/nostrum/shard/session.ex +++ b/lib/nostrum/shard/session.ex @@ -145,6 +145,13 @@ defmodule Nostrum.Shard.Session do {:keep_state_and_data, :postpone} end + # If we've been here before, we want to use the resume gateway URL to connect + # instead of the regular gateway URL. + def connecting_http(:enter, from, %{resume_gateway: resume_gateway} = data) + when resume_gateway != nil do + connecting_http(:enter, from, %{data | gateway: resume_gateway}) + end + def connecting_http(:enter, _from, %{gateway: gateway} = data) do set_timeout = {:state_timeout, @timeout_connect, :connect_timeout} diff --git a/lib/nostrum/struct/ws_state.ex b/lib/nostrum/struct/ws_state.ex index dcc07c3fe..ead4bc5cf 100644 --- a/lib/nostrum/struct/ws_state.ex +++ b/lib/nostrum/struct/ws_state.ex @@ -12,6 +12,7 @@ defmodule Nostrum.Struct.WSState do :conn_pid, :stream, :gateway, + :resume_gateway, :last_heartbeat_send, :last_heartbeat_ack, :heartbeat_ack, @@ -50,6 +51,10 @@ defmodule Nostrum.Struct.WSState do @typedoc "Gateway URL" @type gateway :: String.t() + @typedoc "Gateway URL to use for resuming." + @typedoc since: "0.9.0" + @type resume_gateway :: String.t() | nil + @typedoc """ The time the last heartbeat was sent, if a heartbeat hasn't been sent it will be the time the websocket process was started @@ -80,6 +85,7 @@ defmodule Nostrum.Struct.WSState do conn_pid: conn_pid, stream: stream, gateway: gateway, + resume_gateway: resume_gateway, last_heartbeat_send: last_heartbeat_send, last_heartbeat_ack: last_heartbeat_ack, heartbeat_ack: heartbeat_ack, From 0b66b268ebd3b3e348b7cfcd6da41612a04facc9 Mon Sep 17 00:00:00 2001 From: Johannes Christ Date: Fri, 2 Jun 2023 10:54:49 +0200 Subject: [PATCH 4/4] Brutally close connection on gun_down Similar reason as for the ratelimiter --- lib/nostrum/shard/event.ex | 3 +-- lib/nostrum/shard/session.ex | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/nostrum/shard/event.ex b/lib/nostrum/shard/event.ex index 3edff41f9..73866e859 100644 --- a/lib/nostrum/shard/event.ex +++ b/lib/nostrum/shard/event.ex @@ -21,8 +21,7 @@ defmodule Nostrum.Shard.Event do if payload.t == :READY do Logger.info("READY") - {%{state | session: payload.d.session_id, resume_gateway: payload.d.resume_gateway_url}, - []} + {%{state | session: payload.d.session_id, resume_gateway: payload.d.resume_gateway_url}, []} else {state, []} end diff --git a/lib/nostrum/shard/session.ex b/lib/nostrum/shard/session.ex index fe7a13337..41dc0558e 100644 --- a/lib/nostrum/shard/session.ex +++ b/lib/nostrum/shard/session.ex @@ -266,7 +266,11 @@ defmodule Nostrum.Shard.Session do %{conn: conn} = data ) do Logger.info("Lost complete shard connection. Attempting reconnect.") - :gun.flush(conn) + # Brutally close to make sure we don't mess up the state machine + # due to gun reconnecting automatically. For the WebSocket disconnect + # case, this is fine. + :ok = :gun.close(conn) + :ok = :gun.flush(conn) connect = {:next_event, :internal, :connect} {:next_state, :disconnected, %{data | conn: nil, stream: nil}, connect} end