diff --git a/guides/intro/api.md b/guides/intro/api.md index 3f3bffd7b..1b6e7ff4d 100644 --- a/guides/intro/api.md +++ b/guides/intro/api.md @@ -55,7 +55,7 @@ they're given or using `Nostrum.Api.request/4` to call an endpoint. To ensure that every request is handled properly, no matter if they're called asynchronously or not, nostrum funnels all requests through the -`Nostrum.Api.Ratelimiter` GenServer. +`Nostrum.Api.Ratelimiter` state machine. ## REST-only diff --git a/guides/intro/intro.md b/guides/intro/intro.md index 007451c55..cbab6713f 100644 --- a/guides/intro/intro.md +++ b/guides/intro/intro.md @@ -107,8 +107,8 @@ The following options are only used for testing nostrum itself. ## Logging nostrum uses Elixir's standard logger to inform you about regular and irregular -events. Normal messages include Discord-requested shard reconnections, -ratelimiter waits, and the `IDENTIFY` and `READY` events. +events. Normal messages include Discord-requested shard reconnections and the +`IDENTIFY` and `READY` events. The following metadata fields through logger: diff --git a/lib/nostrum/api/base.ex b/lib/nostrum/api/base.ex index 881717c55..5b9512eea 100644 --- a/lib/nostrum/api/base.ex +++ b/lib/nostrum/api/base.ex @@ -9,8 +9,7 @@ defmodule Nostrum.Api.Base do @type methods :: :get | :post | :put | :delete @spec request(pid, methods(), String.t(), iodata(), [{String.t(), String.t()}], Enum.t()) :: - {:error, :timeout | {:connection_error, any} | {:down, any} | {:stream_error, any}} - | {:ok, {non_neg_integer, [{String.t(), String.t()}], binary}} + :gun.stream_ref() def request(conn, method, route, body, raw_headers, params) do headers = process_request_headers(raw_headers) # Convert method from atom to string for `:gun` @@ -23,23 +22,7 @@ defmodule Nostrum.Api.Base do full_route = "#{base_route()}#{route}?#{query_string}" headers = process_request_headers(headers, body) - stream = :gun.request(conn, method, full_route, headers, process_request_body(body)) - - case :gun.await(conn, stream) do - {:response, :fin, status, headers} -> - {:ok, {status, headers, ""}} - - {:response, :nofin, status, headers} -> - {:ok, body} = :gun.await_body(conn, stream) - {:ok, {status, headers, body}} - - {:error, :timeout} = result -> - Logger.debug("Request for #{inspect(full_route)} timed out") - result - - {:error, _reason} = result -> - result - end + :gun.request(conn, method, full_route, headers, process_request_body(body)) end def process_request_headers(headers, ""), do: :proplists.delete("content-type", headers) diff --git a/lib/nostrum/api/ratelimiter.ex b/lib/nostrum/api/ratelimiter.ex index 2bdefd997..4612ae2ac 100644 --- a/lib/nostrum/api/ratelimiter.ex +++ b/lib/nostrum/api/ratelimiter.ex @@ -2,6 +2,7 @@ defmodule Nostrum.Api.Ratelimiter do @moduledoc """ Handles REST calls to the Discord API while respecting ratelimits. + ## Purpose Discord's API returns information about ratelimits that we must respect. This @@ -9,188 +10,590 @@ defmodule Nostrum.Api.Ratelimiter do thus preventing concurrency issues from arising if two processes make a remote API call at the same time. + + > ### Internal module {: .info} + > + > This module is intended for exclusive usage inside of nostrum, and is + > documented for completeness and people curious to look behind the covers. + + + ## Asynchronous requests + + The ratelimiter is fully asynchronous internally. In theory, it also supports + queueing requests in an asynchronous manner. However, support for this is + currently not implemented in `Nostrum.Api`. + + If you want to make one or multiple asynchronous requests manually, you can + use the following pattern: + + ```elixir + req = :gen_statem.send_request(Nostrum.Api.Ratelimiter, {:queue, request}) + # ... + response = :gen_statem.receive_response(req, timeout) + ``` + + where `request` is a map describing the request to run - see `Nostrum.Api` + for more information. You can also send multiple requests at the same time + and wait for their response: see `:gen_statem.reqids_add/3` and + `:gen_statem.wait_response/3` for more information. + + + ## Multi-node + + If a single global process is desired to handle all ratelimiting, the + ratelimiter can theoretically be adjusted to start registered via `:global`. + In practice, it may be more beneficial to have a local ratelimiter process on + each node and either using the local one for any API calls, or using a + consistent hash mechanism to distribute API requests around the cluster as + needed. + + ## Inner workings When a client process wants to perform some request on the Discord API, it - sends a request to the `GenServer` behind this module to ask it to `:queue` + sends a request to the `:gen_statem` behind this module to ask it to `:queue` the incoming request. - The server looks up the ratelimit buckets for the given endpoint using the - configured `Nostrum.Store.RatelimitBucket`. If no bucket is available, a - request will be sent out directly, and the server will wait for the response. - - After receiving a response, the ratelimiter updates the matching ratelimit - bucket and return the response to the client. - - If the client disconnects from the ratelimiter, or the request is dropped by - the ratelimiter for another reason - usually a timeout - while the request is - still existing on Discord's end, the Ratelimiter will log the response later - when it receives it. - - ### Serialization and buckets - - We serialize all REST requests in nostrum through this process to prevent - concurrency issues arising from multiple clients exhausting the bucket (e.g. - going to a `t:Nostrum.Store.RatelimitBucket.remaining/0` value below `0`). - This critical spot only needs to happen shortly before running the request, - but **only if we already have a bucket for the coming request**. If we do not - have a bucket already, we must serialize it and not make further requests for - the same bucket until we have received information from Discord on the - ratelimits on the given endpoint. Otherwise, we may end up running multiple - requests to the same endpoint because no bucket was stored to tell us that we - shouldn't. A more efficient alternative may be only blocking requests to the - specific bucket we have sent a request to by keeping track of "unbucketed - running requests" and removing elements as we retrieve bucket information. + + ### Connection setup + + If the state machine is not connected to the HTTP endpoint, it will + transition to the `:connecting` state and try to open the connection. If this + succeeds, it transitions to the `:connected` state. + + The state machine associates a `t::queue.queue/1` of `t:queued_request/0` to + each individual bucket, together with an internal count of remaining calls. + If an entry is found with remaining calls above 0, the request is scheduled + for immediate execution. If an entry is found with remaining calls of 0, or + with the special `:initial` value (indicating that the initial request to + find the ratelimit just headed out), it is queued. Otherwise, if no entry is + found, a new queue is created with an `:initial` remaining call count, and + the request scheduled for immediate execution. + + The request starting function, `:next`, will start new requests from the + queue as long as more calls are possible in the timeframe. Any requests are + then started asynchronously. Bookkeeping is set up to associate the resulting + `t::gun.stream_ref/0` with the original client along with its request and the + ratelimiter bucket. + + Results from the HTTP connection are delivered non-blocking: simple responses + with purely status codes and no body (code `204`) will be sent in a single + message, other requests will be sent to us incrementally. To finally deliver + the full response body to the client with the final package, an internal + buffer of the body is kept. A possible future optimization could be having a + way for `:gun` to only send the ratelimiter state machine the initial + `:gun_response` and forward any item of the body directly to the client. + + When the headers for a request have been received, the ratelimiter parses the + ratelimit information and starts off an internal timer expiring when the + ratelimits expire. It will also reschedule calls with the `:next` internal + event for as many remaining calls as it knows about. Once the timer expires + for the current bucket, two cases can happen: + + - The queue has items: Schedule all items and repeat this later. + + - The queue is empty: Delete the queue and remaining calls from the outstanding buckets. + + In practice, this means that we never store more information than we need, + and removes the previous regular bucket sweeping functionality that the + ratelimit buckets required. + + **Global ratelimits** are handled with the special `global_limit` state. This + state is only entered with a state timeout, the state timeout being the + `X-Ratelimit-Reset-After` value provided in the global ratelimit response. + This state does nothing apart from postponing any events it receives and + returning to the previous state (`:connected`) once the global timeout is + gone. + + + ### Failure modes + + #### HTTP connection death + + If the HTTP connection dies, the ratelimiter will inform each affected client + by replying with `{:error, {:connection_died, reason}}`, where `reason` is + the reason as provided by the `:gun_down` event. It will then transition to + `:disconnected` state. If no requests were running at time the connection was + shut down - for instance, because we simply reached the maximum idle time on + the HTTP/2 connection - we will simply move on. + + #### Other internal issues + + Any other internal problems that are not handled appropriately in the + ratelimiter will crash it, effectively resulting in the complete loss of any + queued requests. + + + ### Implementation benefits & drawbacks + + #### A history of ratelimiting + + First, it is important to give a short history of nostrum's ratelimiting: pre + `0.8`, nostrum used to use a `GenServer` that would call out to ETS tables to + look up ratelimiting buckets for requests. If it needed to sleep before + issuing a request due to the bucket being exhausted, it would do so in the + server process and block other callers. + + In nostrum 0.8, the existing ratelimiter bucket storage architecture was + refactored to be based around the [pluggable caching + functionality](../advanced/pluggable_caching.md), and buckets with no + remaining calls were adjusted to be slept out on the client-side by having + the `GenServer` respond to the client with `{:error, {:retry_after, millis}}` + and the client trying again and again to schedule its requests. This allowed + users to distribute their ratelimit buckets around however they wish, out of + the box, nostrum shipped with an ETS and a Mnesia-based ratelimit bucket + store. + + + #### Problems we solved + + The approach above still came with a few problems: + + - Requests were still being done synchronously in the ratelimiter, and it was + blocked from anything else whilst running the requests, even though we are + theoretically free to start requests for other buckets while one is still + running. + + - The ratelimiter itself was half working on its own, but half required the + external storage mechanisms, which made the code hard to follow and required + regular automatic pruning because the store had no idea when a bucket was no + longer relevant on its own. + + - Requests would not be pipelined to run as soon as ideally possible. + + - The ratelimiter did not inform clients if their request died in-flight. + + - If the client disconnected before we returned the response, we had to + handle this explicitly via `handle_info`. + + The new state machine-based ratelimiter solves these problems. """ - use GenServer + @behaviour :gen_statem alias Nostrum.Api.Base alias Nostrum.Constants alias Nostrum.Error.ApiError - alias Nostrum.Store.RatelimitBucket - alias Nostrum.Util require Logger - # Ratelimits are waited out on client-side. This constant determines the - # attempts to requeue an individual request if the ratelimiter told us that - # we should wait. Once this many attempts have been made at queueing a - # request, further attempts are aborted. - @default_attempts_to_requeue 50 + @major_parameters ["channels", "guilds", "webhooks"] + @registered_name __MODULE__ - # Total attempts to try to reconnect to the API in case it is not reachable. - # The current amount of reconnect attempts is intentionally geared to - # basically reconnect forever, because the ratelimiter process currently - # always assumes a working connection. The `handle_info` callbacks for the - # `:gun_up` and `:gun_down` events may be of interest. - @reconnect_attempts 1_000_000_000 + @typedoc """ + A bucket for endpoints unter the same ratelimit. + """ + @typedoc since: "0.9.0" + @type bucket :: String.t() - # How often to prune stale buckets in the ratelimiter bucket storage. - @bucket_cleanup_interval :timer.hours(1) + @typedoc """ + A request to make in the ratelimiter. + """ + @typedoc since: "0.9.0" + @type request :: %{ + method: :get | :post | :put | :delete, + route: String.t(), + body: iodata(), + headers: [{String.t(), String.t()}], + params: Enum.t() + } - # How far back to prune when running a stale bucket cleanup. Any ratelimiter - # buckets older than the interval given here will be removed every - # `@bucket_cleanup_interval` milliseconds. - @bucket_cleanup_window @bucket_cleanup_interval + @typedoc """ + A bucket-specific request waiting to be queued, alongside its client. + """ + @typedoc since: "0.9.0" + @type queued_request :: {request(), client :: :gen_statem.from()} @typedoc """ - Return values of start functions. + Remaining calls on a route, as provided by the API response. + + The ratelimiter internally counts the remaining calls per route to dispatch + new requests as soon as it's capable of doing so, but this is only possible + if the API already provided us with ratelimit information for an endpoint. + + Therefore, if the initial call on an endpoint is made, the special `:initial` + value is specified. This is used by the limit parsing function to set the + remaining calls if and only if it is the response for the initial call - + otherwise, the value won't represent the truth anymore. """ - @type on_start :: - {:ok, pid} - | :ignore - | {:error, {:already_started, pid} | term} + @typedoc since: "0.9.0" + @type remaining :: non_neg_integer() | :initial - @major_parameters ["channels", "guilds", "webhooks"] - @gregorian_epoch 62_167_219_200 - @registered_name __MODULE__ + @typedoc """ + The state of the ratelimiter. + + While this has no public use, it is still documented here to provide help + when tracing the ratelimiter via `:sys.trace/2` or other means. + + ## Fields + + - `:outstanding`: Outstanding (unqueued) requests per bucket alongside with + the remaining calls that may be made on said bucket. + + - `:running`: Requests that have been sent off. Used to associate back the + client with a request when the response comes in. + + - `:inflight`: Requests for which we have started getting a response, but we + have not fully received it yet. For responses that have a body, this will + buffer their body until we can send it back to the client. + + - `:conn`: The `:gun` connection backing the server. Used for making new + requests, and updated as the state changes. + """ + @typedoc since: "0.9.0" + @type state :: %{ + outstanding: %{ + bucket => {remaining, :queue.queue(queued_request)} + }, + running: %{ + :gun.stream_ref() => {bucket(), request(), :gen_statem.from()} + }, + inflight: %{ + :gun.stream_ref() => + {status :: non_neg_integer(), headers :: [{String.t(), String.t()}], + body :: String.t()} + }, + conn: pid() | nil + } @doc """ Starts the ratelimiter. """ - @spec start_link([]) :: on_start - def start_link([]) do - GenServer.start_link(__MODULE__, [], name: @registered_name) + @spec start_link([:gen_statem.start_opt()]) :: :gen_statem.start_ret() + def start_link(opts) do + :gen_statem.start_link({:local, @registered_name}, __MODULE__, [], opts) end def init([]) do + # Uncomment the following to trace everything the ratelimiter is doing: + # me = self() + # spawn(fn -> :sys.trace(me, true) end) + # See more examples in the `sys` docs. + {:ok, :disconnected, empty_state()} + end + + def callback_mode, do: :state_functions + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :worker, + restart: :permanent, + shutdown: 500 + } + end + + # The Glorious State Machine + # Inspired by Peter Morgan's "Postpone: Resource Allocation on Demand" + # https://shortishly.com/blog/postpone-resource-allocation-on-demand/ + + def disconnected({:call, _from}, _request, data) do + {:next_state, :connecting, data, + [ + {:next_event, :internal, :open}, + {:state_timeout, :timer.seconds(10), :connect_timeout}, + :postpone + ]} + end + + def connecting(:internal, :open, data) do domain = to_charlist(Constants.domain()) - open_opts = %{retry: @reconnect_attempts, tls_opts: Constants.gun_tls_opts()} + open_opts = %{ + connect_timeout: :timer.seconds(5), + domain_lookup_timeout: :timer.seconds(5), + retry: 3, + tls_handshake_timeout: :timer.seconds(5), + tls_opts: Constants.gun_tls_opts() + } + {:ok, conn_pid} = :gun.open(domain, 443, open_opts) + {:keep_state, %{data | conn: conn_pid}} + end - {:ok, :http2} = :gun.await_up(conn_pid) + def connecting(:info, {:gun_up, conn_pid, :http2}, %{conn: conn_pid} = data) do + {:next_state, :connected, data} + end - # Start the old route cleanup loop - Process.send_after(self(), :remove_old_buckets, @bucket_cleanup_interval) + def connecting({:call, _from}, _request, _data) do + {:keep_state_and_data, :postpone} + end - {:ok, conn_pid} + def connecting(:state_timeout, :connect_timeout, _data) do + {:stop, :connect_timeout} end - defp get_retry_time(route, method) do - route - |> get_endpoint(method) - |> RatelimitBucket.timeout_for() + def connected({:call, from}, {:queue, request}, %{outstanding: outstanding} = data) do + bucket = get_endpoint(request.route, request.method) + + # The outstanding maps contains pairs in the form `{remaining, queue}`, + # where `remaining` is the amount of remaining calls we may make, and + # `queue` is the waiting line of requests. If the ratelimit on the bucket + # expires, the internal timeout event will automatically reschedule queued + # requests (starting with a single one to get the calls we may make). + case Map.get(outstanding, bucket) do + # We have no remaining calls, or the initial call to get rate limiting + # information is in flight. Let's join the waiting line. + {remaining, queue} when remaining in [0, :initial] -> + entry = {request, from} + + data_with_this_queued = + put_in(data, [:outstanding, bucket], {remaining, :queue.in(entry, queue)}) + + {:keep_state, data_with_this_queued} + + # There is an entry - so somebody did find some ratelimiting information + # here recently - but that entry tells us we may make a call right away. + {remaining, queue} when remaining > 0 -> + # Sanity check. This can be removed after release is considered stable. + # Why should this be empty? + # Because when we receive ratelimit information and see that there are + # still items in the queue, we should internally schedule them right away. + # Otherwise, we are mixing up the order. + true = :queue.is_empty(queue) + {:keep_state_and_data, [{:next_event, :internal, {:run, request, bucket, from}}]} + + # There is no entry. We are the pioneer for this bucket. + nil -> + # Since we don't have any explicit ratelimiting information for this + # bucket yet, we set the remaining calls to zero. While the first + # request is in flight, we do not want any further requests to be sent + # out until we have ratelimit information from it, at which point other + # requests are ran from the queue. + run_request = {:next_event, :internal, {:run, request, bucket, from}} + data_with_new_queue = put_in(data, [:outstanding, bucket], {0, :queue.new()}) + {:keep_state, data_with_new_queue, [run_request]} + end end - @doc """ - Queue the given request. + # Run the given request right now, and do any bookkeeping. + def connected(:internal, {:run, request, bucket, from}, %{conn: conn} = data) do + stream = + Base.request( + conn, + request.method, + request.route, + request.body, + request.headers, + request.params + ) + + data_with_this_running = put_in(data, [:running, stream], {bucket, request, from}) + {:keep_state, data_with_this_running} + end - If the ratelimiter tells us to sit it out and we have more than `0` attempts - remaining, we sleep out the given retry time and ask it to queue again - afterwards. - """ - def queue(request, attempts_remaining \\ @default_attempts_to_requeue) do - case GenServer.call(@registered_name, {:queue, request}) do - {:error, {:retry_after, time}} when attempts_remaining > 0 -> - truncated = :erlang.ceil(time) + # `:next` will run the next `remaining` requests for the given bucket's + # queue, and stop as soon as no more entries are found. + def connected(:internal, {:next, 0, _bucket}, _data) do + :keep_state_and_data + end - attempt = @default_attempts_to_requeue - attempts_remaining + 1 + def connected(:internal, {:next, remaining, bucket}, %{outstanding: outstanding} = data) do + {^remaining, queue} = Map.fetch!(outstanding, bucket) - Logger.info( - "RATELIMITER: Waiting #{truncated}ms to process request with route #{request.route} (try #{attempt} / #{@default_attempts_to_requeue})" - ) + case :queue.out(queue) do + {:empty, _queue} -> + # Nobody wants to run anything on the bucket. We can hop out. + :keep_state_and_data - Process.sleep(truncated) - queue(request, attempts_remaining - 1) + {{:value, {request, from}}, updated_queue} -> + # We found a request we can queue. Account for the request, start it, + # and then try and see if we can queue another one, repeating the cycle + # until we have either exhausted the queue of waiting requests or the + # remaining calls on the endpoint. + accounted_remaining = remaining - 1 - # We've had enough. Bail. - {:error, {:retry_after, _time}} when attempts_remaining == 0 -> - {:error, :max_retry_attempts_exceeded} + outstanding_without_this = + Map.put(outstanding, bucket, {accounted_remaining, updated_queue}) - result -> - result + run_request = {:next_event, :internal, {:run, request, bucket, from}} + try_starting_next = {:next_event, :internal, {:next, accounted_remaining, bucket}} + + {:keep_state, %{data | outstanding: outstanding_without_this}, + [run_request, try_starting_next]} end end - def handle_call({:queue, request}, _from, conn) do - # Do the final serialized double-take of ratelimits to prevent races. - # The client already did it, but, you know. - case get_retry_time(request.route, request.method) do - :now -> - {:reply, do_request(request, conn), conn} + # The bucket's ratelimit window has expired: we may make calls again. Or, to + # be more specific, we may make a single call to find out how many calls we + # will have remaining in the next window. If there are waiting entries, we + # start scheduling. + def connected( + {:timeout, bucket}, + :expired, + %{outstanding: outstanding} = data + ) + when is_map_key(outstanding, bucket) do + # "remaining" is mostly worthless here, since the bucket's remaining calls + # have now reset anyways. + {_remaining, queue} = Map.fetch!(outstanding, bucket) + + case :queue.out(queue) do + {:empty, _queue} -> + # Nobody else has anything to queue, so we're good on cleaning up the bucket. + {:keep_state, %{data | outstanding: Map.delete(outstanding, bucket)}} + + {{:value, {request, from}}, updated_queue} -> + # There's more where that came from. Update the stored queue and + # schedule the request to run instantly. Since this is the initial + # request to get the new ratelimit, we also set the special marker. + outstanding_with_this = Map.put(outstanding, bucket, {:initial, updated_queue}) + run_request = {:next_event, :internal, {:run, request, bucket, from}} + {:keep_state, %{data | outstanding: outstanding_with_this}, [run_request]} + end + end - time -> - {:reply, {:error, {:retry_after, time}}, conn} + # Beginning of the response. For responses without a body, this is the + # complete response. For responses with a body, we set up the buffer here. In + # either case, we parse the retrieved ratelimiting information here. + # The "status != 429" should probably be removed on release such that we can + # deal with global ratelimits properly. + def connected( + :info, + {:gun_response, _conn, stream, kind, status, headers}, + %{inflight: inflight, running: running} = data + ) + when status != 429 do + {bucket, _request, from} = Map.fetch!(running, stream) + response = parse_response(status, headers) + limits = parse_headers(response) + parse_limits = {:next_event, :internal, {:parse_limits, limits, bucket}} + + case kind do + :nofin -> + inflight_with_this = Map.put(inflight, stream, {status, headers, ""}) + {:keep_state, %{data | inflight: inflight_with_this}, parse_limits} + + :fin -> + running_without_this = Map.delete(running, stream) + + {:keep_state, %{data | running: running_without_this}, + [ + {:reply, from, format_response(response)}, + parse_limits + ]} end end - # The gun connection went down. Any requests in `_killed_streams` are definitely gone. - # Other streams may also be gone. Gun will reconnect automatically for us. - def handle_info({:gun_down, _conn, _proto, _reason, _killed_streams}, state) do - {:noreply, state} + def connected(:info, {:gun_data, _conn, stream, :nofin, body}, %{inflight: inflight} = data) do + inflight_with_buffer = + Map.update!( + inflight, + stream, + fn {status, headers, buffer} -> + {status, headers, <>} + end + ) + + {:keep_state, %{data | inflight: inflight_with_buffer}} end - # Gun automatically reconnected after the connection went down previously. - def handle_info({:gun_up, _conn, _proto}, state) do - {:noreply, state} + def connected( + :info, + {:gun_data, _conn, stream, :fin, body}, + %{inflight: inflight, running: running} = data + ) do + {{_bucket, _request, from}, running_without_this} = Map.pop(running, stream) + {{status, headers, buffer}, inflight_without_this} = Map.pop(inflight, stream) + full_buffer = <> + unparsed = parse_response(status, headers, full_buffer) + response = format_response(unparsed) + + {:keep_state, %{data | inflight: inflight_without_this, running: running_without_this}, + {:reply, from, response}} end - def handle_info({:gun_response, _conn, _ref, :nofin, status, _headers}, state) do - Logger.debug( - "Got unexpected (probably late) HTTP response with status #{status}, discarding it" - ) + # Parse limits and deal with them accordingly by scheduling the bucket expiry + # timeout and scheduling the next requests to run as appropriate. + def connected( + :internal, + {:parse_limits, {:bucket_limit, {remaining, reset_after}}, bucket}, + %{outstanding: outstanding} = data + ) do + expire_bucket = {{:timeout, bucket}, reset_after, :expired} + + case Map.fetch!(outstanding, bucket) do + # This is the first response we got for the absolute initial call. + # Update the remaining value to the reported value. + {:initial, queue} -> + updated_outstanding = Map.put(outstanding, bucket, {remaining, queue}) + + {:keep_state, %{data | outstanding: updated_outstanding}, + [ + expire_bucket, + {:next_event, :internal, {:next, remaining, bucket}} + ]} + + # We already have some information about the remaining calls saved. In + # that case, don't touch it - just try to reschedule and `:next` will do + # the rest. + # Why not update the `remaining` value? If we update it to the value + # reported in the response, it may jump up again, but the remaining + # value, once set, must strictly monotonically decrease. We count the + # requests we have running down to zero in the state machine, but when + # the first response from Discord comes in, it may tell us we have four + # remaining calls while in reality multiple other requests are already in + # flight and ready to cause their ratelimiter engineers some headaches. + # Therefore, we must rely on our own value (and on the API to not decide + # to change the ratelimit halfway through the bucket lifetime). + {stored_remaining, _queue} -> + {:keep_state_and_data, + [ + expire_bucket, + {:next_event, :internal, {:next, stored_remaining, bucket}} + ]} + end + end - {:noreply, state} + def connected(:internal, {:parse_limits, {:global_limit, retry_after}, _bucket}, data) do + {:next_state, :global_limit, data, [{:state_timeout, retry_after, :connected}]} end - def handle_info({:gun_data, _conn, _ref, _conn_state, _data}, state) do - Logger.debug("Got unexpected (probably late) data from a HTTP response, discarding it") - {:noreply, state} + def connected(:info, {:gun_down, conn, :http2, reason, killed_streams}, %{running: running}) do + :ok = :gun.flush(conn) + + replies = + Enum.map( + killed_streams, + &{:reply, Map.fetch!(running, &1), {:error, {:connection_died, reason}}} + ) + + {:next_state, :disconnected, empty_state(), replies} end - def handle_info(:remove_old_buckets, state) do - RatelimitBucket.cleanup(@bucket_cleanup_window) - Process.send_after(self(), :remove_old_buckets, @bucket_cleanup_interval) - {:noreply, state} + def global_limit(:state_timeout, next, _data) do + {:next_state, next} end - defp do_request(request, conn) do - conn - |> Base.request(request.method, request.route, request.body, request.headers, request.params) - |> handle_headers(get_endpoint(request.route, request.method)) - |> format_response + def global_limit(_event, _request, _data) do + {:keep_state_and_data, :postpone} + end + + defp parse_response(status, headers), do: {:ok, {status, headers, ""}} + + defp parse_response(status, headers, buffer), + do: {:ok, {status, headers, buffer}} + + @spec empty_state :: state() + defp empty_state, + do: %{ + outstanding: %{}, + running: %{}, + inflight: %{}, + conn: nil + } + + # Helper functions + + @doc """ + Queue the given request and wait for the response synchronously. + + Ratelimits on the endpoint are handled by the ratelimiter. Global ratelimits + will cause this to return an error. + """ + def queue(request) do + :gen_statem.call(@registered_name, {:queue, request}) end @spec value_from_rltuple({String.t(), String.t()}) :: String.t() | nil @@ -203,65 +606,24 @@ defmodule Nostrum.Api.Ratelimiter do |> value_from_rltuple() end - defp handle_headers({:error, reason}, _route), do: {:error, reason} - - defp handle_headers({:ok, {_status, headers, _body}} = response, route) do - # Per https://discord.com/developers/docs/topics/rate-limits, all of these - # headers are optional, which is why we supply a default of 0. + # defp parse_headers({:error, _reason} = result), do: result + defp parse_headers({:ok, {_status, headers, _body}}) do global_limit = header_value(headers, "x-ratelimit-global") - remaining = header_value(headers, "x-ratelimit-remaining") remaining = unless is_nil(remaining), do: String.to_integer(remaining) - reset = header_value(headers, "x-ratelimit-reset") - reset = unless is_nil(reset), do: :erlang.trunc(:math.ceil(String.to_float(reset))) - retry_after = header_value(headers, "retry-after") + reset_after = header_value(headers, "x-ratelimit-reset-after") - retry_after = - unless is_nil(retry_after) do - # Since for some reason this might not contain a "." - # and String.to_float raises if it doesn't - {retry_after, ""} = Float.parse(retry_after) - :erlang.trunc(:math.ceil(retry_after)) - end + reset_after = + unless is_nil(reset_after), + do: :erlang.trunc(:math.ceil(String.to_float(reset_after) * 1000)) - origin_timestamp = - headers - |> header_value("date", "0") - |> date_string_to_unix - - latency = abs(origin_timestamp - Util.now()) - - # If we have hit a global limit, Discord responds with a 429 and informs - # us when we can retry. Our global bucket keeps track of this ratelimit. - unless is_nil(global_limit), do: update_global_bucket(route, 0, retry_after, latency) - - # If Discord did send us other ratelimit information, we can also update - # the ratelimiter bucket for this route. For some endpoints, such as - # when creating a DM with a user, we may not retrieve ratelimit headers. - unless is_nil(reset) or is_nil(remaining), do: update_bucket(route, remaining, reset, latency) - - response - end - - defp update_bucket(route, remaining, reset_time, latency) do - RatelimitBucket.update(route, remaining, reset_time * 1000, latency) - end - - defp update_global_bucket(_route, _remaining, retry_after, latency) do - RatelimitBucket.update("GLOBAL", 0, retry_after + Util.now(), latency) - end - - defp date_string_to_unix(header) do - header - |> String.to_charlist() - |> :httpd_util.convert_request_date() - |> erl_datetime_to_timestamp - end - - defp erl_datetime_to_timestamp(datetime) do - (:calendar.datetime_to_gregorian_seconds(datetime) - @gregorian_epoch) * 1000 + if is_nil(global_limit) do + {:bucket_limit, {remaining, reset_after}} + else + {:global_limit, reset_after} + end end @doc """ @@ -291,8 +653,8 @@ defmodule Nostrum.Api.Ratelimiter do defp format_response(response) do case response do - {:error, error} -> - {:error, error} + # {:error, error} -> + # {:error, error} {:ok, {status, _, body}} when status in [200, 201] -> {:ok, body} diff --git a/lib/nostrum/store/ratelimit_bucket.ex b/lib/nostrum/store/ratelimit_bucket.ex deleted file mode 100644 index 0458df39b..000000000 --- a/lib/nostrum/store/ratelimit_bucket.ex +++ /dev/null @@ -1,151 +0,0 @@ -defmodule Nostrum.Store.RatelimitBucket do - @default_implementation __MODULE__.ETS - @moduledoc """ - Behaviour & dispatcher for storing ratelimit buckets. - - ## Purpose - - Calls to Discord's API are ratelimited, and we are informed about our - remaining calls by Discord after issuing them. This module concerns itself - with storing this information to prevent running into HTTP 429 errors. As - this is used mainly by `Nostrum.Api.Ratelimiter`, it is unlikely you need to - use it directly yourself. - - ## Configuration - - By default, nostrum will use `#{@default_implementation}` to store ratelimit - buckets. To override this, set the `[:stores, :ratelimit_buckets]` setting on - nostrum's application configuration: - - ```elixir - config :nostrum, - stores: %{ - ratelimit_buckets: MyBot.Nostrum.Store.RatelimitBucket - } - ``` - - This setting must be set at compile time. - - ## Implementation - - If implementing your own ratelimit bucket store, in addition to the callbacks - of this module, you must also provide the function `child_spec/1`. The - recommended approach is to spawn a `Supervisor` to manage your store. - """ - @moduledoc since: "0.8.0" - - @configured_store :nostrum - |> Application.compile_env( - [:stores, :ratelimit_buckets], - @default_implementation - ) - - alias Nostrum.Util - - @typedoc """ - The route a bucket applies to. - - The constant value `"GLOBAL"` is used for the global bucket. - """ - @type route :: String.t() - - @typedoc "Remaining calls for a bucket." - @type remaining :: non_neg_integer() - - @typedoc "Time at which a bucket resets, in milliseconds." - @type reset_time :: pos_integer() - - @typedoc "Latency between us and our latest call for this bucket, in milliseconds." - @type latency :: pos_integer() - - @typedoc "Individual bucket information for a route." - @type bucket :: {route(), remaining(), reset_time(), latency()} - - @doc """ - Update the given bucket to have the given remaining calls. - - Normally used after issuing an API call for a previously used route. - """ - @callback update(route(), remaining()) :: :ok - - @doc """ - Update the given bucket with full rate limit information. - - Whilst `c:update/2` is based around the assumption that you only know the - remaining calls (for instance, after issuing one), this function is used - after receiving full rate limiting information after an API call. - """ - @callback update(route(), remaining(), reset_time(), latency()) :: :ok - - @doc """ - Retrieve bucket information by route. - - If no information is available, return `nil`. - """ - @callback lookup(route()) :: bucket() | nil - - @doc """ - Clean up entries in the bucket older than the given amount of milliseconds. - - This function is called automatically by the ratelimiter in regular - intervals. - - Return the amount of deleted entries. - """ - @callback cleanup(pos_integer()) :: non_neg_integer() - - @doc """ - Retrieve the child specification for starting this mapping under a supervisor. - """ - @callback child_spec(term()) :: Supervisor.child_spec() - - @doc """ - Receive the time to wait before issuing more calls to the given route. - - This function must only be called prior to issuing the actual API call: it - will decrement the remaining calls counter from the given bucket. - - Nostrum takes the API latency into account when issuing these calls, and uses - the current time to determine when it expires. It is therefore assumed that - each node has a relatively equal latency to the API, and the nodes have - little to no time drift. - """ - @spec timeout_for(route()) :: remaining() | :now - @spec timeout_for(route(), module()) :: remaining() | :now - def timeout_for(route, store \\ @configured_store) do - case store.lookup(route) do - # XXX: In multi-node - or rather, with multiple rate limiter processes, - # this poses a race condition. It needs to be atomic. - {route, remaining, _reset_time, _latency} when remaining > 0 -> - store.update(route, remaining - 1) - :now - - {_route, _remaining, reset_time, latency} -> - # Oh fuck. What the fuck did I just get myself into. I just realized: - # if we introduce some shared storage for this, whatever it is, then we - # probably also need to introduce some form of fucking time - # synchronization or some other absolute hellspawn of evilkind into - # this pure project. How would the latency be of value for the other - # servers? Well, if it's the same data center or even the same city our - # state, it's probably fine. But a unix timestamp? In milliseconds? - # Distributed across servers? What the fuck, man. Do people configure - # ntpd properly? Do people even know what ntpd is anymore? Do I even - # know what I'm doing anymore? - - case reset_time - Util.now() + latency do - time when time <= 0 -> :now - time -> time - end - - nil -> - :now - end - end - - defdelegate update(route, remaining), to: @configured_store - defdelegate update(route, remaining, reset_time, latency), to: @configured_store - defdelegate lookup(route), to: @configured_store - defdelegate cleanup(age), to: @configured_store - @doc false - defdelegate child_spec(opts), to: @configured_store -end diff --git a/lib/nostrum/store/ratelimit_bucket/ets.ex b/lib/nostrum/store/ratelimit_bucket/ets.ex deleted file mode 100644 index d995c3d70..000000000 --- a/lib/nostrum/store/ratelimit_bucket/ets.ex +++ /dev/null @@ -1,92 +0,0 @@ -defmodule Nostrum.Store.RatelimitBucket.ETS do - @moduledoc """ - Stores ratelimit buckets via `:ets`. - - If programmatic access to the ETS table is needed, please use the `table/0` - function. - - Please do not use this module directly, apart from special functions such as - `table/0`. Use `Nostrum.Store.RatelimitBucket` to call the configured - mapping instead. - """ - @moduledoc since: "0.8.0" - - alias Nostrum.Store.RatelimitBucket - alias Nostrum.Util - use Supervisor - - @behaviour RatelimitBucket - - @table_name :nostrum_ratelimit_buckets - - @doc "Retrieve the ETS table reference used for the store." - @spec table :: :ets.table() - def table, do: @table_name - - @doc "Start the supervisor." - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @doc "Set up the store's ETS table." - @impl Supervisor - def init(_init_arg) do - :ets.new(@table_name, [:set, :public, :named_table]) - Supervisor.init([], strategy: :one_for_one) - end - - @impl RatelimitBucket - @doc "Update an existing routes remaining calls." - @spec update(RatelimitBucket.route(), RatelimitBucket.remaining()) :: :ok - def update(route, remaining) do - :ets.update_element(@table_name, route, {2, remaining}) - :ok - end - - @impl RatelimitBucket - @doc "Set up a new ratelimiter bucket from the given arguments." - @spec update( - RatelimitBucket.route(), - RatelimitBucket.remaining(), - RatelimitBucket.reset_time(), - RatelimitBucket.latency() - ) :: :ok - def update(route, remaining, reset_time, latency) do - :ets.insert(@table_name, {route, remaining, reset_time, latency}) - :ok - end - - @impl RatelimitBucket - @doc "Look up the most relevant ratelimiter bucket for the given route." - @spec lookup(RatelimitBucket.route()) :: RatelimitBucket.bucket() | nil - def lookup(route) do - route_time = :ets.lookup(@table_name, route) - global_time = :ets.lookup(@table_name, "GLOBAL") - - max = - Enum.max_by([route_time, global_time], fn info -> - case info do - [] -> -1 - [{_route, _remaining, reset_time, _latency}] -> reset_time - end - end) - - case max do - [] -> nil - [entry] -> entry - end - end - - @impl RatelimitBucket - @doc "Clean up bucket entries older than `age` milliseconds." - @spec cleanup(pos_integer()) :: non_neg_integer() - def cleanup(age) do - then = Util.now() - age - - # created from :ets.fun2ms( - # fn {_, _, reset_time, _} when reset_time < then -> true end - # ) - match_spec = [{{:_, :_, :"$1", :_}, [{:<, :"$1", then}], [true]}] - :ets.select_delete(@table_name, match_spec) - end -end diff --git a/lib/nostrum/store/ratelimit_bucket/mnesia.ex b/lib/nostrum/store/ratelimit_bucket/mnesia.ex deleted file mode 100644 index 472d7beb5..000000000 --- a/lib/nostrum/store/ratelimit_bucket/mnesia.ex +++ /dev/null @@ -1,133 +0,0 @@ -if Code.ensure_loaded?(:mnesia) do - defmodule Nostrum.Store.RatelimitBucket.Mnesia do - @moduledoc """ - Stores ratelimit buckets using Mnesia. - - #{Nostrum.Cache.Base.mnesia_note()} - """ - @moduledoc since: "0.8.0" - - alias Nostrum.Store.RatelimitBucket - alias Nostrum.Util - - @behaviour RatelimitBucket - - @table_name :nostrum_ratelimit_buckets - @record_name @table_name - - use Supervisor - - @doc "Retrieve the Mnesia table name used for the store." - @spec table :: atom() - def table, do: @table_name - - @doc "Start the supervisor." - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @doc "Drop the table used for the store." - @spec teardown() :: {:atomic, :ok} | {:aborted, term()} - def teardown, do: :mnesia.delete_table(@table_name) - - @doc "Set up the store's Mnesia table." - @impl Supervisor - def init(_init_arg) do - options = [ - attributes: [:route, :remaining, :reset_time, :latency], - record_name: @record_name - ] - - case :mnesia.create_table(@table_name, options) do - {:atomic, :ok} -> :ok - {:aborted, {:already_exists, _tab}} -> :ok - end - - Supervisor.init([], strategy: :one_for_one) - end - - @impl RatelimitBucket - @doc "Update an existing routes remaining calls." - @spec update(RatelimitBucket.route(), RatelimitBucket.remaining()) :: :ok - def update(route, remaining) do - :ok = - :mnesia.activity(:sync_transaction, fn -> - case :mnesia.read(@table_name, route, :write) do - [result] -> - :mnesia.write(put_elem(result, 2, remaining)) - - [] -> - :ok - end - end) - end - - @impl RatelimitBucket - @doc "Set up a new ratelimiter bucket from the given arguments." - @spec update( - RatelimitBucket.route(), - RatelimitBucket.remaining(), - RatelimitBucket.reset_time(), - RatelimitBucket.latency() - ) :: :ok - def update(route, remaining, reset_time, latency) do - :ok = - :mnesia.activity(:sync_transaction, fn -> - :mnesia.write({@record_name, route, remaining, reset_time, latency}) - end) - end - - @impl RatelimitBucket - @doc "Look up the most relevant ratelimiter bucket for the given route." - @spec lookup(RatelimitBucket.route()) :: RatelimitBucket.bucket() | nil - def lookup(route) do - {route_time, global_time} = - :mnesia.activity(:sync_transaction, fn -> - {:mnesia.read(@table_name, route), :mnesia.read(@table_name, "GLOBAL")} - end) - - max = - Enum.max_by([route_time, global_time], fn info -> - case info do - [] -> -1 - [{_tag, _route, _remaining, reset_time, _latency}] -> reset_time - end - end) - - case max do - [] -> nil - [{_tag, route, remaining, reset_time, latency}] -> {route, remaining, reset_time, latency} - end - end - - @impl RatelimitBucket - @doc "Clean up bucket entries older than `age` milliseconds." - @spec cleanup(pos_integer()) :: non_neg_integer() - def cleanup(age) do - then = Util.now() - age - - match_spec = [{{:_, :"$1", :_, :"$2", :_}, [{:<, :"$2", then}], ["$1"]}] - - :mnesia.activity(:sync_transaction, fn -> - do_delete(:mnesia.select(@table_name, match_spec, 100, :write)) - end) - end - - defp do_delete(starter) do - do_delete(0, starter) - end - - defp do_delete(deleted, {[route | matches], cont}) do - :mnesia.delete(@table_name, route, :write) - do_delete(deleted + 1, {matches, cont}) - end - - defp do_delete(deleted, {[], cont}) do - do_delete(deleted, :mnesia.select(cont)) - end - - defp do_delete(deleted, :"$end_of_table") do - deleted - end - end -end diff --git a/lib/nostrum/store/supervisor.ex b/lib/nostrum/store/supervisor.ex index a3890f39f..04c2cf965 100644 --- a/lib/nostrum/store/supervisor.ex +++ b/lib/nostrum/store/supervisor.ex @@ -4,7 +4,6 @@ defmodule Nostrum.Store.Supervisor do Please see the following modules for more details: - `Nostrum.Store.GuildShardMapping` - - `Nostrum.Store.RatelimitBucket` - `Nostrum.Store.UnavailableGuild` """ @moduledoc since: "0.8.0" @@ -17,7 +16,6 @@ defmodule Nostrum.Store.Supervisor do def init([]) do children = [ - Nostrum.Store.RatelimitBucket, Nostrum.Store.GuildShardMapping, Nostrum.Store.UnavailableGuild ] diff --git a/mix.exs b/mix.exs index 48c2ebb90..186c12de4 100644 --- a/mix.exs +++ b/mix.exs @@ -61,7 +61,6 @@ defmodule Nostrum.Mixfile do Nostrum.Constants, Nostrum.Store, Nostrum.Store.GuildShardMapping, - Nostrum.Store.RatelimitBucket, Nostrum.Store.UnavailableGuild, Nostrum.Struct, Nostrum.Struct.Event @@ -87,7 +86,7 @@ defmodule Nostrum.Mixfile do def groups_for_modules do [ Api: [ - ~r/Nostrum.Api/, + ~r/Nostrum.Api$/, ~r/Nostrum.Consumer/, ~r/Nostrum.(Permission|Voice)/ ], @@ -112,6 +111,7 @@ defmodule Nostrum.Mixfile do ~r/Nostrum.Store.\w+.\w+$/ ], "Internal modules": [ + ~r/Nostrum.Api.Ratelimiter/, ~r/Nostrum.Shard/ ] ] diff --git a/test/nostrum/store/ratelimit_bucket_meta_test.exs b/test/nostrum/store/ratelimit_bucket_meta_test.exs deleted file mode 100644 index b2f2772f4..000000000 --- a/test/nostrum/store/ratelimit_bucket_meta_test.exs +++ /dev/null @@ -1,69 +0,0 @@ -defmodule Nostrum.Store.RatelimitBucketMetaTest do - alias Nostrum.Store.RatelimitBucket - use ExUnit.Case, async: true - - @store_modules [ - # Dispatcher - RatelimitBucket, - # Implementations - RatelimitBucket.ETS, - RatelimitBucket.Mnesia - ] - - for store <- @store_modules do - defmodule :"#{store}Test" do - use ExUnit.Case - @store store - doctest @store - - setup do - if function_exported?(@store, :teardown, 0) do - # Silence bogus warning with `apply` - on_exit(:teardown, fn -> apply(@store, :teardown, []) end) - end - - [pid: start_supervised!(@store)] - end - - describe "empty lookup" do - test "returns nothing" do - refute @store.lookup("route") - end - - test "succeeds in cleanup" do - assert 0 = @store.cleanup(0) - end - - test "allows update for new bucket" do - reset_time = :erlang.unique_integer([:positive]) - assert @store.update("ROUTE", 3, reset_time, 123) - end - end - - describe "updating existing bucket" do - @route "TESTBUCKET" - - setup context do - reset_time = :erlang.unique_integer([:positive]) - remaining_calls = 3 - latency = 123 - @store.update(@route, remaining_calls, reset_time, latency) - - Map.merge(context, %{ - remaining: remaining_calls, - reset_time: reset_time, - latency: latency - }) - end - - test "returns bucket", %{remaining: remaining, reset_time: reset_time, latency: latency} do - assert {@route, ^remaining, ^reset_time, ^latency} = @store.lookup(@route) - end - - test "timeout_for/1" do - assert RatelimitBucket.timeout_for(@route, @store) - end - end - end - end -end