Skip to content

Commit

Permalink
Merge pull request #598 from Kraigie/jb3/zstd
Browse files Browse the repository at this point in the history
Support zstd-stream gateway compression
  • Loading branch information
jchristgit authored May 18, 2024
2 parents ace2e2f + 4e5672b commit 3ec78d4
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 16 deletions.
50 changes: 50 additions & 0 deletions guides/advanced/gateway_compression.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Gateway Compression

Nostrum supports either the `zlib-stream` or `zstd-stream` gateway compression
methods, as documented
[here](https://discord.com/developers/docs/topics/gateway#encoding-and-compression)

Most users are fine to leave the `gateway_compression` configuration option set
to `:zlib` (default), but users looking for a potential reduction in payload
sizes from the Discord gateway can optionally set `:zstd` here.

## Using `:zstd` compression

Using `:zstd` depends on the [`:ezstd`](https://hex.pm/packages/ezstd) library,
so you will have to add this dependency to your `mix.exs` file:

```elixir
defp deps do
[
{:nostrum, ...},
{:ezstd, "~> 1.1"} # new dependency
]
end
```


> #### `:ezstd` NIFs {: .info}
>
> Some functionality of `:ezstd` depends on Erlang NIFs (Natively Implemented
> Functions). This means that a proper compiler installation as well as other
> build tools like `git` may be necessary at the stage where you compile your
> dependencies.
>
> It may be useful to run `mix deps.compile` in any build systems to ensure that
> your application does not need build utilities in the built application image.
Once you have this additional dependency installed in your project, set the
`:nostrum`, `:gateway_compression` configuration option to `:zstd` and Nostrum
should pick up on it.

You will need to run `mix deps.get` and `mix deps.compile` to install and
compile the new `:ezstd` dependency.

> #### Nostrum detection of `:ezstd` {: .tip}
>
> Since the check for `:ezstd` takes place when you compile Nostrum, you might
> need to run `mix deps.compile --force nostrum` to ensure that Nostrum is
> recompiled and recognises the newly installed `:ezstd` dependency.
>
> Not doing this may mean that your compiled Nostrum version is still using
> dummy handlers that will error out even when `:ezstd` is installed.
3 changes: 3 additions & 0 deletions guides/intro/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Apart from the `token` field mentioned above, the following fields are also supp
of members for all guilds at startup. Depending on your [cache
backend](../advanced/pluggable_caching.md), this may increase startup time
and memory usage by quite a bit. Defaults to `false`.
- `gateway_compression` - use either `:zlib` (default) or `:zstd` for compression
of messages from the Discord gateway. See the documentation on
[Gateway Compression](../advanced/gateway_compression.md) for more information.


### Voice-specific
Expand Down
60 changes: 48 additions & 12 deletions lib/nostrum/shard/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,33 @@ defmodule Nostrum.Shard.Session do
alias Nostrum.Shard.{Connector, Event, Payload}
alias Nostrum.Struct.WSState

alias Nostrum.Shard.Session.Compression

require Logger

@behaviour :gen_statem

# Query string to connect to when upgrading the connection.
@gateway_qs "/?compress=zlib-stream&encoding=etf&v=10"
@gateway_qs "/?encoding=etf&v=10"

@gateway_compress Application.compile_env(
:nostrum,
:gateway_compression,
:zlib
)

@compression_module (case @gateway_compress do
:zlib ->
Compression.Zlib

:zstd ->
Compression.Zstd.check_available!()
Compression.Zstd

_ ->
raise ArgumentError,
"Unsupported compression type: #{@gateway_compress}"
end)

# Maximum time the initial connection may take.
@timeout_connect :timer.seconds(5)
Expand Down Expand Up @@ -190,34 +211,44 @@ defmodule Nostrum.Shard.Session do
# end

def connecting_ws(:enter, _from, %{conn: conn} = data) do
Logger.debug("Upgrading connection to websocket")
Logger.debug("Upgrading connection to websocket with #{@gateway_compress} compression")
set_timeout = {:state_timeout, @timeout_ws_upgrade, :upgrade_timeout}
stream = :gun.ws_upgrade(conn, @gateway_qs, [], %{flow: @standard_flow})
stream = :gun.ws_upgrade(conn, @gateway_qs <> compression_qs(), [], %{flow: @standard_flow})
{:keep_state, %{data | stream: stream}, set_timeout}
end

def connecting_ws(
:info,
{:gun_upgrade, _conn, _stream, ["websocket"], _headers},
%{zlib_ctx: nil} = data
%{compress_ctx: nil} = data
) do
zlib_context = :zlib.open()
:zlib.inflateInit(zlib_context)
context = @compression_module.create_context()

{:next_state, :connected,
%{data | zlib_ctx: zlib_context, last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}}
%{
data
| compress_ctx: context,
last_heartbeat_ack: DateTime.utc_now(),
heartbeat_ack: true
}}
end

def connecting_ws(
:info,
{:gun_upgrade, _conn, _stream, ["websocket"], _headers},
%{zlib_ctx: zlib_ctx} = data
%{compress_ctx: compress_ctx} = data
) do
Logger.info("Re-established websocket connection")
:ok = :zlib.inflateReset(zlib_ctx)

compress_ctx = @compression_module.reset_context(compress_ctx)

{:next_state, :connected,
%{data | last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}}
%{
data
| last_heartbeat_ack: DateTime.utc_now(),
heartbeat_ack: true,
compress_ctx: compress_ctx
}}
end

def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
Expand Down Expand Up @@ -248,9 +279,10 @@ defmodule Nostrum.Shard.Session do
end

def connected(:info, {:gun_ws, _worker, stream, {:binary, frame}}, data) do
payload_decompressed = @compression_module.inflate(data.compress_ctx, frame)

payload =
data.zlib_ctx
|> :zlib.inflate(frame)
payload_decompressed
|> :erlang.iolist_to_binary()
|> :erlang.binary_to_term()

Expand Down Expand Up @@ -375,4 +407,8 @@ defmodule Nostrum.Shard.Session do
:timeout
end
end

defp compression_qs do
"&compress=" <> Atom.to_string(@gateway_compress) <> "-stream"
end
end
32 changes: 32 additions & 0 deletions lib/nostrum/shard/session/compression.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Nostrum.Shard.Session.Compression do
@moduledoc """
A behaviour for compression methods supported by the Discord gateway to implement.
See the modules nested under this behaviour for reference implementations.
> ### Internal module {: .info}
>
> This module is intended for exclusive usage inside of nostrum, and is
> documented for completeness and people curious to look behind the covers.
"""

@doc """
Create a new compression context that can be passed as an argument to other
methods within the behaviour to inflate data or reset the context to a
blank state.
"""
@callback create_context() :: reference()

@doc """
Decompress a frame received from Discord over the gateway. Should return an
iolist of the decompressed data.
"""
@callback inflate(reference(), iodata()) :: iolist()

@doc """
Reset a decompression context to a blank slate, this is useful after a websocket
resume has taken place or something similar requiring the reset of the state
for a shard.
"""
@callback reset_context(reference()) :: reference()
end
32 changes: 32 additions & 0 deletions lib/nostrum/shard/session/compression/zlib.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Nostrum.Shard.Session.Compression.Zlib do
@moduledoc """
Implementation of compression methods for the `zlib` compression algorithm.
> ### Internal module {: .info}
>
> This module is intended for exclusive usage inside of nostrum, and is
> documented for completeness and people curious to look behind the covers.
"""

@behaviour Nostrum.Shard.Session.Compression

@spec create_context() :: :zlib.zstream()
def create_context do
context = :zlib.open()
:zlib.inflateInit(context)

context
end

@spec inflate(:zlib.zstream(), iodata()) :: iolist()
def inflate(ctx, frame) do
:zlib.inflate(ctx, frame)
end

@spec reset_context(:zlib.zstream()) :: :zlib.zstream()
def reset_context(ctx) do
:zlib.inflateReset(ctx)

ctx
end
end
61 changes: 61 additions & 0 deletions lib/nostrum/shard/session/compression/zstd.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Nostrum.Shard.Session.Compression.Zstd do
@moduledoc """
Implementation of compression methods for the `zstd` compression algorithm.
> ### Internal module {: .info}
>
> This module is intended for exclusive usage inside of nostrum, and is
> documented for completeness and people curious to look behind the covers.
"""

@behaviour Nostrum.Shard.Session.Compression

def check_available! do
if not Code.ensure_loaded?(:ezstd) do
zstd_missing()
end
end

@spec zstd_missing() :: no_return
defp zstd_missing do
raise ArgumentError, """
Cannot use the :zstd gateway compression option without optional dependency :ezstd.
See https://kraigie.github.io/nostrum/gateway_compression.html for more information.
"""
end

if Code.ensure_loaded?(:ezstd) do
@zstd_buffer_size 16_000

@spec create_context() :: reference() | {:error, any()}
def create_context do
:ezstd.create_decompression_context(@zstd_buffer_size)
end

@spec inflate(reference(), iodata()) :: iolist() | {:error, any()}
def inflate(ctx, frame) do
:ezstd.decompress_streaming(ctx, frame)
end

@spec reset_context(reference()) :: reference()
def reset_context(_ctx) do
create_context()
end
else
@spec create_context() :: reference() | {:error, any()}
def create_context do
zstd_missing()
end

@spec inflate(reference(), iodata()) :: iodata() | {:error, any()}
def inflate(_ctx, _frame) do
zstd_missing()
end

@spec reset_context(reference()) :: reference()
def reset_context(_ctx) do
zstd_missing()
end
end
end
8 changes: 4 additions & 4 deletions lib/nostrum/struct/ws_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Nostrum.Struct.WSState do
:last_heartbeat_ack,
:heartbeat_ack,
:heartbeat_interval,
:zlib_ctx
:compress_ctx
]

@typedoc "The shard number"
Expand Down Expand Up @@ -73,8 +73,8 @@ defmodule Nostrum.Struct.WSState do
@typedoc "Interval at which heartbeats are sent"
@type heartbeat_interval :: pos_integer() | nil

@typedoc "Reference to the current zlib context"
@type zlib_ctx :: reference | nil
@typedoc "Reference to the current compression context"
@type compress_ctx :: reference | nil

@type t :: %__MODULE__{
shard_num: shard_num,
Expand All @@ -90,6 +90,6 @@ defmodule Nostrum.Struct.WSState do
last_heartbeat_ack: last_heartbeat_ack,
heartbeat_ack: heartbeat_ack,
heartbeat_interval: heartbeat_interval,
zlib_ctx: zlib_ctx
compress_ctx: compress_ctx
}
end
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ defmodule Nostrum.Mixfile do
"guides/advanced/pluggable_caching.md",
"guides/advanced/multi_node.md",
"guides/advanced/hot_code_upgrade.md",
"guides/advanced/gateway_compression.md",
"guides/cheat-sheets/api.cheatmd",
"guides/cheat-sheets/qlc.cheatmd",
"guides/cheat-sheets/voice.cheatmd"
Expand Down Expand Up @@ -159,6 +160,7 @@ defmodule Nostrum.Mixfile do
{:certifi, "~> 2.13"},
{:kcl, "~> 1.4"},
{:mime, "~> 1.6 or ~> 2.0"},
{:ezstd, "~> 1.1", optional: true},
{:castle, "~> 0.3.0", runtime: false},
{:ex_doc, "~> 0.32", only: :dev, runtime: false},
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"equivalex": {:hex, :equivalex, "1.0.3", "170d9a82ae066e0020dfe1cf7811381669565922eb3359f6c91d7e9a1124ff74", [:mix], [], "hexpm", "46fa311adb855117d36e461b9c0ad2598f72110ad17ad73d7533c78020e045fc"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.32.1", "21e40f939515373bcdc9cffe65f3b3543f05015ac6c3d01d991874129d173420", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5142c9db521f106d61ff33250f779807ed2a88620e472ac95dc7d59c380113da"},
"ezstd": {:hex, :ezstd, "1.1.0", "d3b483d6acfadfb65dba4015371e6d54526dbf3d9ef0941b5add8bf5890731f4", [:rebar3], [], "hexpm", "28cfa0ed6cc3922095ad5ba0f23392a1664273358b17184baa909868361184e7"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"forecastle": {:hex, :forecastle, "0.1.1", "89dcfaccbfffe866cbd8a4c41ade55f62f00f1b5d0528bec787b1e6631004b98", [:mix], [], "hexpm", "f6f4d297224a22ac4387d305249aed7b8b02e85b4a03e83225af4536812c4079"},
"gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"},
Expand Down

0 comments on commit 3ec78d4

Please # to comment.