Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(shard): manually shard connect and reconnect #596

Merged
merged 12 commits into from
May 22, 2024
Merged
23 changes: 20 additions & 3 deletions lib/nostrum/shard.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
defmodule Nostrum.Shard do
@moduledoc false

use Supervisor
use Supervisor, restart: :transient

alias Nostrum.Shard.Session

def start_link([_, shard_num, _total] = opts) do
def start_link({:connect, [_, shard_num, _total]} = opts) do
Supervisor.start_link(__MODULE__, opts, name: :"Nostrum.Shard-#{shard_num}")
end

def start_link(
{:reconnect,
%{shard: [_gateway, shard_num, _total], resume_gateway: _resume_gateway, seq: _seq}} =
opts
) do
Supervisor.start_link(__MODULE__, opts, name: :"Nostrum.Shard-#{shard_num}")
end

def start_link([_, _shard_num, _total] = opts) do
start_link({:connect, opts})
end

def init(opts) do
children = [
{Session, opts}
# TODO: Add per shard ratelimiter
# TODO: Add per shard cache
]

Supervisor.init(children, strategy: :one_for_all, max_restarts: 3, max_seconds: 60)
Supervisor.init(children,
strategy: :one_for_all,
max_restarts: 3,
max_seconds: 60,
auto_shutdown: :any_significant
)
end
end
77 changes: 75 additions & 2 deletions lib/nostrum/shard/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,38 @@ defmodule Nostrum.Shard.Session do
:gen_statem.cast(pid, {:request_guild_members, payload})
end

def disconnect(pid) do
:gen_statem.call(pid, {:disconnect, nil})
end

def disconnect(pid, timeout) do
:gen_statem.call(pid, {:disconnect, nil}, timeout)
end

def get_ws_state(pid) do
:sys.get_state(pid)
end

# State machine API

def start_link({:connect, [_gateway, _shard_num, _total]} = opts, statem_opts) do
:gen_statem.start_link(__MODULE__, opts, statem_opts)
end

def start_link(
{:reconnect,
%{shard: [_gateway, _shard_num, _total], resume_gateway: _resume_gateway, seq: _seq}} =
opts,
statem_opts
) do
:gen_statem.start_link(__MODULE__, opts, statem_opts)
end

def start_link([_gateway, _shard_num, _total] = shard_opts, statem_opts) do
:gen_statem.start_link(__MODULE__, shard_opts, statem_opts)
end

def init([gateway, shard_num, total]) do
def init({:connect, [gateway, shard_num, total]}) do
Logger.metadata(shard: shard_num)

state = %WSState{
Expand All @@ -110,14 +131,44 @@ defmodule Nostrum.Shard.Session do
{:ok, :disconnected, state, connect}
end

def init(
{:reconnect,
%{
shard: [gateway, shard_num, total],
resume_gateway: resume_gateway,
seq: seq,
session: session
}}
) do
Logger.metadata(shard: shard_num)

state = %WSState{
conn_pid: self(),
shard_num: shard_num,
total_shards: total,
gateway: gateway,
resume_gateway: resume_gateway,
session: session,
seq: seq
}

connect = {:next_event, :internal, :connect}
{:ok, :disconnected, state, connect}
end

def init([gateway, shard_num, total]) do
init({:connect, [gateway, shard_num, total]})
end

def callback_mode, do: [:state_functions, :state_enter]

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts, []]},
type: :worker,
restart: :permanent,
restart: :transient,
significant: true,
shutdown: 500
}
end
Expand Down Expand Up @@ -302,6 +353,28 @@ defmodule Nostrum.Shard.Session do
:keep_state_and_data
end

def connected({:call, from}, {:disconnect, nil}, %{
conn: conn,
shard_num: shard_num,
total_shards: total,
gateway: gateway,
resume_gateway: resume_gateway,
seq: seq,
session: session
}) do
:ok = :gun.close(conn)
:ok = :gun.flush(conn)

{:stop_and_reply, :normal,
{:reply, from,
%{
shard: [gateway, shard_num, total],
resume_gateway: resume_gateway,
session: session,
seq: seq
}}}
end

def connected(
:state_timeout,
:send_heartbeat = request,
Expand Down
66 changes: 49 additions & 17 deletions lib/nostrum/shard/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Nostrum.Shard.Supervisor do
handle yet, thus growing the message queue and the memory usage.
"""

use Supervisor
use DynamicSupervisor

alias Nostrum.Error.CacheError
alias Nostrum.Shard
Expand Down Expand Up @@ -66,19 +66,31 @@ defmodule Nostrum.Shard.Supervisor do
def start_link(_args) do
{url, gateway_shard_count} = Util.gateway()

value = Application.get_env(:nostrum, :num_shards, :auto)
shard_range = cast_shard_range(gateway_shard_count, value)
on_start =
DynamicSupervisor.start_link(
__MODULE__,
nil,
name: __MODULE__
)

Supervisor.start_link(
__MODULE__,
[url, shard_range],
name: __MODULE__
)
case Application.get_env(:nostrum, :num_shards, :auto) do
:unstable_manual ->
on_start

value ->
{lowest, highest, total} = cast_shard_range(gateway_shard_count, value)

shard_range = lowest..highest

for num <- shard_range, do: connect([url, num - 1, total])
end

on_start
end

def update_status(status, game, stream, type) do
__MODULE__
|> Supervisor.which_children()
|> DynamicSupervisor.which_children()
|> Enum.filter(fn {_id, _pid, _type, [modules]} -> modules == Nostrum.Shard end)
|> Enum.map(fn {_id, pid, _type, _modules} -> Supervisor.which_children(pid) end)
|> List.flatten()
Expand All @@ -102,18 +114,38 @@ defmodule Nostrum.Shard.Supervisor do
end

@doc false
def init([url, {lowest, highest, total}]) do
shard_range = lowest..highest
children = for num <- shard_range, do: create_worker(url, num - 1, total)

Supervisor.init(children, strategy: :one_for_one, max_restarts: 3, max_seconds: 60)
def init(_) do
DynamicSupervisor.init(strategy: :one_for_one, max_restarts: 3, max_seconds: 60)
end

@doc false
def create_worker(gateway, shard_num, total) do
Supervisor.child_spec(
{Shard, [gateway, shard_num, total]},
id: shard_num
{Shard, [gateway, shard_num, total]}
end

def disconnect(shard_num) do
:"Nostrum.Shard-#{shard_num}"
|> Supervisor.which_children()
|> Enum.find(fn {id, _pid, _type, _modules} -> id == Nostrum.Shard.Session end)
|> elem(1)
|> Session.disconnect()
end

def connect([url, num, total]) do
DynamicSupervisor.start_child(__MODULE__, create_worker(url, num, total))
end

def reconnect(
%{
shard: [_gateway, _shard_num, _total],
resume_gateway: _resume_gateway,
seq: _seq,
session: _session
} = opts
) do
DynamicSupervisor.start_child(
__MODULE__,
{Shard, {:reconnect, opts}}
)
end
end