From 9d9c236fc9bd1ebafc00dc139ac607eaadd009a4 Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 3 Dec 2017 16:21:15 -0800 Subject: [PATCH 1/9] Initial connection pool implementation --- integration_test/cases/queue_test.exs | 4 +- integration_test/connection_pool/all_test.exs | 1 + .../connection_pool/test_helper.exs | 11 + lib/db_connection/connection_pool.ex | 341 ++++++++++++++++++ mix.exs | 2 +- 5 files changed, 356 insertions(+), 3 deletions(-) create mode 100644 integration_test/connection_pool/all_test.exs create mode 100644 integration_test/connection_pool/test_helper.exs create mode 100644 lib/db_connection/connection_pool.ex diff --git a/integration_test/cases/queue_test.exs b/integration_test/cases/queue_test.exs index 0383ace..30196ac 100644 --- a/integration_test/cases/queue_test.exs +++ b/integration_test/cases/queue_test.exs @@ -58,7 +58,7 @@ defmodule QueueTest do run = fn() -> try do - P.run(pool, fn(_) -> flunk("run ran") end, [pool_timeout: 50]) + P.run(pool, fn(_) -> flunk("run ran") end, [pool_timeout: 50, timeout: 50]) rescue DBConnection.ConnectionError -> :error @@ -165,7 +165,7 @@ defmodule QueueTest do {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self(), backoff_start: 30_000, - queue_timeout: 10, pool_timeout: 10] + queue_timeout: 10, pool_timeout: 10, timeout: 10] {:ok, pool} = P.start_link(opts) P.run(pool, fn(_) -> diff --git a/integration_test/connection_pool/all_test.exs b/integration_test/connection_pool/all_test.exs new file mode 100644 index 0000000..cba57bb --- /dev/null +++ b/integration_test/connection_pool/all_test.exs @@ -0,0 +1 @@ +Code.require_file "../tests.exs", __DIR__ \ No newline at end of file diff --git a/integration_test/connection_pool/test_helper.exs b/integration_test/connection_pool/test_helper.exs new file mode 100644 index 0000000..bf0bf84 --- /dev/null +++ b/integration_test/connection_pool/test_helper.exs @@ -0,0 +1,11 @@ +ExUnit.start([capture_log: true, assert_receive_timeout: 500, + exclude: [:idle_timeout, :pool_overflow, :enqueue_disconnected, + :queue_timeout_exit]]) + +Code.require_file "../../test/test_support.exs", __DIR__ + +defmodule TestPool do + use TestConnection, [pool: DBConnection.ConnectionPool, pool_size: 1] +end + +{:ok, _} = TestPool.ensure_all_started() \ No newline at end of file diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex new file mode 100644 index 0000000..9db65c1 --- /dev/null +++ b/lib/db_connection/connection_pool.ex @@ -0,0 +1,341 @@ +defmodule DBConnection.ConnectionPool do + + alias DBConnection.Ownership.PoolSupervisor + + @behaviour DBConnection.Pool + use GenServer + + @timeout 5000 + @queue true + @queue_target 50 + @queue_interval 1000 + + ## DBConnection.Pool API + + @doc false + def ensure_all_started(_opts, _type) do + {:ok, []} + end + + @doc false + def start_link(mod, opts) do + GenServer.start_link(__MODULE__, {mod, opts}, start_opts(opts)) + end + + @doc false + def child_spec(mod, opts, child_opts \\ []) do + Supervisor.Spec.worker(__MODULE__, [mod, opts], child_opts) + end + + @doc false + def checkout(pool, opts) do + now = System.monotonic_time(:milliseconds) + with pid when node(pid) == node() <- GenServer.whereis(pool) do + queue? = Keyword.get(opts, :queue, @queue) + # Technically its possible for caller to exit between starting + # timer and sending message in call. This leaks the timer but should + # be so rare and last a very short time period. + deadline = start_deadline(pid, now, opts) + try do + GenServer.call(pid, {:checkout, now, queue?, deadline}, :infinity) + catch + :exit, {reason, {:gen_statem, :call, [^pid | _]}} -> + exit({reason, {__MODULE__, :checkout, [pool, opts]}}) + end + else + nil -> + exit({:noproc, {__MODULE__, :checkout, [pool, opts]}}) + {_, node} -> + exit({{:badnode, node}, {__MODULE__, :checkout, [pool, opts]}}) + pid -> + exit({{:badnode, node(pid)}, {__MODULE__, :checkout, [pool, opts]}}) + end + end + + @doc false + def checkin({pid, {deadline, mon}}, conn, _) do + cancel_deadline(deadline) + now = System.monotonic_time(:milliseconds) + GenServer.cast(pid, {:checkin, now, mon, conn}) + end + + @doc false + def disconnect({pid, {deadline, mon}}, err, conn, _) do + cancel_deadline(deadline) + GenServer.cast(pid, {:disconnect, mon, err, conn}) + end + + @doc false + def stop({pid, {deadline, mon}}, err, conn, _) do + cancel_deadline(deadline) + GenServer.cast(pid, {:stop, mon, err, conn}) + end + + ## GenServer api + + def init({mod, opts}) do + queue = :ets.new(__MODULE__, [:private, :ordered_set]) + {:ok, pool, _} = PoolSupervisor.start_pool(mod, pool_opts(opts, queue)) + target = Keyword.get(opts, :queue_target, @queue_target) + interval = Keyword.get(opts, :queue_interval, @queue_interval) + state = %{queue: queue, pool: pool, target: target, interval: interval, + delay: 0, slow: false, next: System.monotonic_time(:milliseconds), + client: nil, conn: nil} + {:ok, state} + end + + def handle_call({:checkout, time, queue?, deadline}, {pid, _} = from, s) do + case s do + %{client: nil, conn: {:ok, _pool_ref, _mod, _state} = conn} -> + client = {deadline, Process.monitor(pid)} + {:reply, put_elem(conn, 1, {self(), client}), %{s | client: client}} + %{client: nil, conn: {:error, _} = error} -> + cancel_deadline(deadline) + {:reply, error, s} + %{queue: queue} when queue? == true -> + client = {deadline, Process.monitor(pid)} + # from is {pid, ref} so order could favor certain pid(s) + :ets.insert(queue, {{time, from}, client}) + {:noreply, s} + s when queue? == false -> + message = "connection not available and queuing is disabled" + err = DBConnection.ConnectionError.exception(message) + {:reply, {:error, err}, s} + end + end + + def handle_cast({:checkin, time, mon, state}, s) do + case s do + %{client: {_deadline, ^mon}, conn: {:ok, _pool_ref, _mod, _state} = conn} -> + result = dequeue(time, put_elem(conn, 3, state), s) + Process.demonitor(mon, [:flush]) + result + %{} -> + {:noreply, s} + end + end + + def handle_cast({:disconnect, mon, err, state}, s) do + abort(&DBConnection.Connection.disconnect/4, mon, err, state, s) + end + + def handle_cast({:stop, mon, err, state}, s) do + abort(&DBConnection.Connection.stop/4, mon, err, state, s) + end + + def handle_cast({:connected, time, queue}, %{queue: queue, conn: conn} = s) + when conn == nil or elem(conn, 0) == :error do + # TODO: connection process must send the state async so that pool never blocks. + case DBConnection.Connection.checkout(s.pool, [queue: false, timeout: :infinity]) do + {:ok, _pool_ref, _mod, _state} = conn -> + dequeue(time, conn, %{s | conn: conn}) + {:error, _} = error -> + error(queue, error) + {:noreply, %{s | conn: error}} + end + end + + def handle_info({:DOWN, mon, _, pid, reason}, s) do + case s do + %{client: {deadline, ^mon}} -> + cancel_deadline(deadline) + message = "client #{inspect pid} exited with: " <> Exception.format_exit(reason) + err = DBConnection.ConnectionError.exception(message) + fail(err, s) + %{} -> + down(mon, s) + end + end + + def handle_info({:timeout, deadline, {pid, sent, time}}, s) do + case s do + %{client: {^deadline, mon}} -> + Process.demonitor(mon, [:flush]) + message = "client #{inspect pid} timed out because " <> + "it queued and checked out the connection for longer than #{time-sent}ms" + err = DBConnection.ConnectionError.exception(message) + fail(err, s) + %{} -> + timeout(deadline, sent, time, s) + end + end + + defp dequeue(time, conn, s) do + case s do + %{next: next, delay: delay, target: target} when time > next -> + dequeue_first(time, delay > target, conn, s) + %{slow: false, queue: queue, delay: delay} -> + dequeue_fast(time, queue, delay, conn, s) + %{slow: true, queue: queue, delay: delay, target: target} -> + dequeue_slow(time, queue, delay, target * 2, conn, s) + end + end + + defp dequeue_first(time, slow?, conn, s) do + %{queue: queue, interval: interval} = s + next = time + interval + case :ets.first(queue) do + {sent, from} = key -> + client = go(queue, key, from, conn) + delay = time - sent + {:noreply, %{s | next: next, delay: delay, slow: slow?, client: client, + conn: conn}} + :"$end_of_table" -> + {:noreply, %{s | next: next, delay: 0, slow: slow?, client: nil, + conn: conn}} + end + end + + defp dequeue_fast(time, queue, delay, conn, s) do + case :ets.first(queue) do + {sent, from} = key -> + delay = min(time - sent, delay) + client = go(queue, key, from, conn) + {:noreply, %{s | delay: delay, client: client, conn: conn}} + :"$end_of_table" -> + {:noreply, %{s | delay: 0, client: nil, conn: conn}} + end + end + + defp dequeue_slow(time, queue, min_delay, timeout, conn, s) do + with {sent, from} = key <- :ets.first(queue) do + case time - sent do + delay when delay > timeout -> + drop(queue, key, from, delay) + dequeue_slow(time, queue, min(delay, min_delay), timeout, conn, s) + delay -> + client = go(queue, key, from, conn) + {:noreply, %{s | delay: min(delay, min_delay), client: client, + conn: conn}} + end + else + :"$end_of_table" -> + {:noreply, %{s | delay: 0, client: nil, conn: conn}} + end + end + + defp go(queue, key, from, conn) do + [{_, client}] = :ets.take(queue, key) + GenServer.reply(from, put_elem(conn, 1, {self(), client})) + client + end + + defp drop(queue, key, from, sojourn) do + message = "connection not available " <> + "and request was dropped from queue after #{sojourn}ms" + err = DBConnection.ConnectionError.exception(message) + error(queue, key, from, err) + end + + defp error(queue, key, from, err) do + [{_, {deadline, mon}}] = :ets.take(queue, key) + GenServer.reply(from, {:error, err}) + Process.demonitor(mon, [:flush]) + cancel_deadline(deadline) + end + + defp error(queue, err) do + case :ets.first(queue) do + {_, from} = key -> + error(queue, key, from, err) + :"$end_of_table" -> + :ok + end + end + + defp abort(fun, mon, err, state, s) do + case s do + %{client: {_deadline, ^mon}, conn: {:ok, pool_ref, _, _}} -> + Process.demonitor(mon, [:flush]) + fun.(pool_ref, err, state, []) + {:noreply, %{s | client: nil, conn: nil}} + %{} -> + {:noreply, s} + end + end + + defp fail(err, %{conn: {:ok, pool_ref, _, state}} = s) do + DBConnection.Connection.disconnect(pool_ref, err, state, []) + {:noreply, %{s | client: nil, conn: nil}} + end + + defp down(mon, %{queue: queue} = s) do + case :ets.match_object(queue, {:_, {:_, mon}}, 1) do + {[{key, {deadline, _}}], _cont} -> + cancel_deadline(deadline) + :ets.delete(queue, key) + {:noreply, s} + :"$end_of_table"-> + {:noreply, s} + end + end + + defp timeout(deadline, sent, time, %{queue: queue} = s) do + case :ets.match_object(queue, {{sent, :_}, {deadline, :_}}) do + [{{_, from} = key, {_, mon}}] -> + message = "connection not available " <> + "and request was dropped from queue after #{time-sent}ms" + err = DBConnection.ConnectionError.exception(message) + GenServer.reply(from, {:error, err}) + Process.demonitor(mon, [:flush]) + :ets.delete(queue, key) + {:noreply, s} + [] -> + {:noreply, s} + end + end + + defp start_opts(opts) do + Keyword.take(opts, [:name, :spawn_opt]) + end + + defp start_deadline(pid, now, opts) do + case abs_timeout(now, opts) do + nil -> + nil + timeout -> + :erlang.start_timer(timeout, pid, {self(), now, timeout}, [abs: true]) + end + end + + defp abs_timeout(now, opts) do + case Keyword.get(opts, :timeout, @timeout) do + :infinity -> + Keyword.get(opts, :deadline) + timeout -> + min(now + timeout, Keyword.get(opts, :deadline)) + end + end + + defp cancel_deadline(deadline) do + :erlang.cancel_timer(deadline, [async: true, info: false]) + end + + defp pool_opts(opts, ref) do + # TODO: Use a pool of size > 1 + opts + |> after_connect_hook(ref) + |> Keyword.put(:pool, DBConnection.Connection) + |> Keyword.put(:sync_connect, false) + |> Keyword.put(:idle, :passive) + end + + defp after_connect_hook(opts, ref) do + Keyword.update(opts, :after_connect, + {__MODULE__, :after_connect, [self(), ref]}, &{__MODULE__, :after_connect, [self(), ref, &1]}) + end + + def after_connect(conn, pool, ref, fun \\ fn _ -> :ok end) do + res = apply_fun(fun, conn) + GenServer.cast(pool, {:connected, System.monotonic_time(:milliseconds), ref}) + res + end + + defp apply_fun(fun, conn) when is_function(fun, 1) do + fun.(conn) + end + defp apply_fun({mod, fun, args}, conn) do + apply(mod, fun, [conn | args]) + end +end + \ No newline at end of file diff --git a/mix.exs b/mix.exs index 8844a7e..af1b34f 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule DBConnection.Mixfile do use Mix.Project - @pools [:connection, :poolboy, :sojourn, :ownership] + @pools [:connection, :connection_pool, :poolboy, :sojourn, :ownership] @version "1.1.2" def project do From 42738a293e5d381b81f1363f7d09e7418f4f1bcf Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 10 Dec 2017 13:25:16 -0800 Subject: [PATCH 2/9] Support multiple connections --- integration_test/cases/idle_test.exs | 2 +- integration_test/cases/queue_test.exs | 10 +- .../connection_pool/test_helper.exs | 2 +- lib/db_connection/app.ex | 1 + lib/db_connection/connection.ex | 58 +- lib/db_connection/connection_pool.ex | 494 ++++++++++-------- 6 files changed, 337 insertions(+), 230 deletions(-) diff --git a/integration_test/cases/idle_test.exs b/integration_test/cases/idle_test.exs index 9687fc1..d10342d 100644 --- a/integration_test/cases/idle_test.exs +++ b/integration_test/cases/idle_test.exs @@ -28,7 +28,7 @@ defmodule TestIdle do end] {:ok, agent} = A.start_link(stack) - opts = [agent: agent, parent: self(), idle_timeout: 50] + opts = [agent: agent, parent: self(), idle_timeout: 50, idle_interval: 50] {:ok, pool} = P.start_link(opts) assert_receive {:hi, conn} assert_receive {:pong, ^conn} diff --git a/integration_test/cases/queue_test.exs b/integration_test/cases/queue_test.exs index 30196ac..6987dcd 100644 --- a/integration_test/cases/queue_test.exs +++ b/integration_test/cases/queue_test.exs @@ -42,7 +42,8 @@ defmodule QueueTest do stack = [{:ok, :state}] {:ok, agent} = A.start_link(stack) - opts = [agent: agent, parent: self(), queue_timeout: 50] + opts = [agent: agent, parent: self(), queue_timeout: 50, queue_target: 50, + queue_interval: 50] {:ok, pool} = P.start_link(opts) parent = self() @@ -58,7 +59,7 @@ defmodule QueueTest do run = fn() -> try do - P.run(pool, fn(_) -> flunk("run ran") end, [pool_timeout: 50, timeout: 50]) + P.run(pool, fn(_) -> flunk("run ran") end, [pool_timeout: 50]) rescue DBConnection.ConnectionError -> :error @@ -147,7 +148,8 @@ defmodule QueueTest do {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self(), backoff_start: 30_000, - queue_timeout: 10, pool_timeout: 10] + queue_timeout: 10, pool_timeout: 10, queue_target: 10, + queue_interval: 10] {:ok, pool} = P.start_link(opts) P.run(pool, fn(_) -> @@ -165,7 +167,7 @@ defmodule QueueTest do {:ok, agent} = A.start_link(stack) opts = [agent: agent, parent: self(), backoff_start: 30_000, - queue_timeout: 10, pool_timeout: 10, timeout: 10] + queue_timeout: 10, pool_timeout: 10, queue_target: 10, queue_interval: 10] {:ok, pool} = P.start_link(opts) P.run(pool, fn(_) -> diff --git a/integration_test/connection_pool/test_helper.exs b/integration_test/connection_pool/test_helper.exs index bf0bf84..14d35d7 100644 --- a/integration_test/connection_pool/test_helper.exs +++ b/integration_test/connection_pool/test_helper.exs @@ -1,5 +1,5 @@ ExUnit.start([capture_log: true, assert_receive_timeout: 500, - exclude: [:idle_timeout, :pool_overflow, :enqueue_disconnected, + exclude: [:pool_overflow, :enqueue_disconnected, :queue_timeout_exit]]) Code.require_file "../../test/test_support.exs", __DIR__ diff --git a/lib/db_connection/app.ex b/lib/db_connection/app.ex index 54aabcc..0ae4526 100644 --- a/lib/db_connection/app.ex +++ b/lib/db_connection/app.ex @@ -9,6 +9,7 @@ defmodule DBConnection.App do supervisor(DBConnection.Task, []), supervisor(DBConnection.Sojourn.Supervisor, []), supervisor(DBConnection.Ownership.PoolSupervisor, []), + supervisor(DBConnection.ConnectionPool.PoolSupervisor, []), worker(DBConnection.Watcher, []) ] Supervisor.start_link(children, strategy: :one_for_all, name: __MODULE__) diff --git a/lib/db_connection/connection.ex b/lib/db_connection/connection.ex index 708a2c5..ae9780d 100644 --- a/lib/db_connection/connection.ex +++ b/lib/db_connection/connection.ex @@ -17,6 +17,7 @@ defmodule DBConnection.Connection do use Connection require Logger alias DBConnection.Backoff + alias DBConnection.ConnectionPool @pool_timeout 5_000 @timeout 15_000 @@ -83,6 +84,11 @@ defmodule DBConnection.Connection do end end + @doc false + def ping({pid, ref}, state) do + Connection.cast(pid, {:ping, ref, state}) + end + ## Internal API @doc false @@ -96,19 +102,26 @@ defmodule DBConnection.Connection do Supervisor.Spec.worker(__MODULE__, [mod, opts, mode], child_opts) end + @doc false + def child_spec(mod, opts, mode, info, child_opts) do + Supervisor.Spec.worker(__MODULE__, [mod, opts, mode, info], child_opts) + end + ## Connection API @doc false def init({mod, opts, mode, info}) do - queue = if mode == :sojourn, do: :broker, else: :queue.new() + queue = if mode in [:connection, :poolboy], do: :queue.new(), else: mode + idle = if mode in [:connection, :poolboy], do: get_idle(opts), else: :passive broker = if mode == :sojourn, do: elem(info, 0) regulator = if mode == :sojourn, do: elem(info, 1) - idle = if mode == :sojourn, do: :passive, else: get_idle(opts) after_timeout = if mode == :poolboy, do: :stop, else: :backoff + pool = if mode == :connection_pool, do: elem(info, 0) + tag = if mode == :connection_pool, do: elem(info, 1) s = %{mod: mod, opts: opts, state: nil, client: :closed, broker: broker, - regulator: regulator, lock: nil, queue: queue, timer: nil, - backoff: Backoff.new(opts), + regulator: regulator, lock: nil, pool: pool, tag: tag, queue: queue, + timer: nil, backoff: Backoff.new(opts), after_connect: Keyword.get(opts, :after_connect), after_connect_timeout: Keyword.get(opts, :after_connect_timeout, @timeout), idle: idle, @@ -191,7 +204,7 @@ defmodule DBConnection.Connection do @doc false def handle_call({:checkout, ref, queue?, timeout}, {pid, _} = from, s) do case s do - %{queue: :broker} -> + %{queue: mode} when is_atom(mode) -> exit(:bad_checkout) %{client: nil, idle: :passive, mod: mod, state: state} -> Connection.reply(from, {:ok, {self(), ref}, mod, state}) @@ -229,6 +242,16 @@ defmodule DBConnection.Connection do end @doc false + def handle_cast({:ping, ref, state}, %{client: {ref, :pool}} = s) do + %{mod: mod} = s + case apply(mod, :ping, [state]) do + {:ok, state} -> + pool_update(state, s) + {:disconnect, err, state} -> + {:disconnect, {:log, err}, %{s | state: state}} + end + end + def handle_cast({:checkin, ref, state}, %{client: {ref, _}} = s) do handle_next(state, s) end @@ -244,7 +267,7 @@ defmodule DBConnection.Connection do {:stop, {err, stack}, %{s | state: state}} end - def handle_cast({:cancel, _}, %{queue: :broker}) do + def handle_cast({:cancel, _}, %{queue: mode}) when is_atom(mode) do exit(:bad_cancel) end def handle_cast({:cancel, ref}, %{client: {ref, _}, state: state} = s) do @@ -293,7 +316,9 @@ defmodule DBConnection.Connection do def handle_cast({:connected, ref}, %{client: {ref, :connect}} = s) do %{mod: mod, state: state, queue: queue, broker: broker} = s case apply(mod, :checkout, [state]) do - {:ok, state} when queue == :broker -> + {:ok, state} when queue == :connection_pool -> + pool_update(state, s) + {:ok, state} when queue == :sojourn -> info = {self(), mod, state} {:await, ^ref, _} = :sbroker.async_ask_r(broker, info, {self(), ref}) {:noreply, %{s | client: {ref, :broker}, state: state}} @@ -320,7 +345,8 @@ defmodule DBConnection.Connection do err = DBConnection.ConnectionError.exception(message) {:disconnect, {down_log(reason), err}, %{s | client: {ref, nil}}} end - def handle_info({:DOWN, _, :process, _, _} = msg, %{queue: :broker} = s) do + def handle_info({:DOWN, _, :process, _, _} = msg, %{queue: mode} = s) + when is_atom(mode) do do_handle_info(msg, s) end def handle_info({:DOWN, ref, :process, _, _} = msg, %{queue: queue} = s) do @@ -410,7 +436,7 @@ defmodule DBConnection.Connection do defp start_opts(:connection, opts) do Keyword.take(opts, [:debug, :name, :timeout, :spawn_opt]) end - defp start_opts(mode, opts) when mode in [:poolboy, :sojourn] do + defp start_opts(mode, opts) when mode in [:poolboy, :sojourn, :connection_pool] do Keyword.take(opts, [:debug, :spawn_opt]) end @@ -460,7 +486,10 @@ defmodule DBConnection.Connection do demonitor(client) handle_next(state, %{s | client: nil, backoff: backoff}) end - defp handle_next(state, %{queue: :broker} = s) do + defp handle_next(state, %{queue: :connection_pool} = s) do + pool_update(state, s) + end + defp handle_next(state, %{queue: :sojourn} = s) do %{client: client, timer: timer, mod: mod, broker: broker} = s demonitor(client) cancel_timer(timer) @@ -600,7 +629,9 @@ defmodule DBConnection.Connection do end end - defp clear_queue(:broker), do: :broker + defp clear_queue(queue) when is_atom(queue) do + queue + end defp clear_queue(queue) do clear = fn({{_, mon}, _, from}) -> @@ -619,6 +650,11 @@ defmodule DBConnection.Connection do :ok end + defp pool_update(state, %{pool: pool, tag: tag, mod: mod} = s) do + ref = ConnectionPool.update(pool, tag, mod, state) + {:noreply, %{s | client: {ref, :pool}, state: state}, :hibernate} + end + defp normal_status(mod, pdict, state) do try do mod.format_status(:normal, [pdict, state]) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 9db65c1..b031c67 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -1,14 +1,15 @@ defmodule DBConnection.ConnectionPool do - alias DBConnection.Ownership.PoolSupervisor - @behaviour DBConnection.Pool use GenServer + alias DBConnection.ConnectionPool.PoolSupervisor @timeout 5000 @queue true @queue_target 50 @queue_interval 1000 + @idle_interval 1000 + @holder_key :__info__ ## DBConnection.Pool API @@ -29,275 +30,275 @@ defmodule DBConnection.ConnectionPool do @doc false def checkout(pool, opts) do + queue? = Keyword.get(opts, :queue, @queue) now = System.monotonic_time(:milliseconds) - with pid when node(pid) == node() <- GenServer.whereis(pool) do - queue? = Keyword.get(opts, :queue, @queue) - # Technically its possible for caller to exit between starting - # timer and sending message in call. This leaks the timer but should - # be so rare and last a very short time period. - deadline = start_deadline(pid, now, opts) - try do - GenServer.call(pid, {:checkout, now, queue?, deadline}, :infinity) - catch - :exit, {reason, {:gen_statem, :call, [^pid | _]}} -> - exit({reason, {__MODULE__, :checkout, [pool, opts]}}) - end - else - nil -> - exit({:noproc, {__MODULE__, :checkout, [pool, opts]}}) - {_, node} -> - exit({{:badnode, node}, {__MODULE__, :checkout, [pool, opts]}}) - pid -> - exit({{:badnode, node(pid)}, {__MODULE__, :checkout, [pool, opts]}}) + timeout = abs_timeout(now, opts) + case GenServer.call(pool, {:checkout, now, queue?}, :infinity) do + {:ok, holder} -> + recv_holder(holder, now, timeout) + {:error, _err} = error -> + error end end @doc false - def checkin({pid, {deadline, mon}}, conn, _) do + def checkin({pool, ref, deadline, holder}, conn, _) do cancel_deadline(deadline) now = System.monotonic_time(:milliseconds) - GenServer.cast(pid, {:checkin, now, mon, conn}) + checkin_holder(holder, pool, conn, {:checkin, ref, now}) end @doc false - def disconnect({pid, {deadline, mon}}, err, conn, _) do + def disconnect({pool, ref, deadline, holder}, err, conn, _) do cancel_deadline(deadline) - GenServer.cast(pid, {:disconnect, mon, err, conn}) + checkin_holder(holder, pool, conn, {:disconnect, ref, err}) end @doc false - def stop({pid, {deadline, mon}}, err, conn, _) do + def stop({pool, ref, deadline, holder}, err, conn, _) do cancel_deadline(deadline) - GenServer.cast(pid, {:stop, mon, err, conn}) + checkin_holder(holder, pool, conn, {:stop, ref, err}) + end + + ## Holder api + + @doc false + def update(pool, ref, mod, state) do + holder = start_holder(pool, ref, mod, state) + now = System.monotonic_time(:milliseconds) + checkin_holder(holder, pool, state, {:checkin, ref, now}) + holder end ## GenServer api def init({mod, opts}) do - queue = :ets.new(__MODULE__, [:private, :ordered_set]) - {:ok, pool, _} = PoolSupervisor.start_pool(mod, pool_opts(opts, queue)) + queue = :ets.new(__MODULE__.Queue, [:private, :ordered_set]) + {:ok, _} = PoolSupervisor.start_pool(queue, mod, opts) target = Keyword.get(opts, :queue_target, @queue_target) interval = Keyword.get(opts, :queue_interval, @queue_interval) - state = %{queue: queue, pool: pool, target: target, interval: interval, - delay: 0, slow: false, next: System.monotonic_time(:milliseconds), - client: nil, conn: nil} - {:ok, state} - end - - def handle_call({:checkout, time, queue?, deadline}, {pid, _} = from, s) do - case s do - %{client: nil, conn: {:ok, _pool_ref, _mod, _state} = conn} -> - client = {deadline, Process.monitor(pid)} - {:reply, put_elem(conn, 1, {self(), client}), %{s | client: client}} - %{client: nil, conn: {:error, _} = error} -> - cancel_deadline(deadline) - {:reply, error, s} - %{queue: queue} when queue? == true -> - client = {deadline, Process.monitor(pid)} - # from is {pid, ref} so order could favor certain pid(s) - :ets.insert(queue, {{time, from}, client}) - {:noreply, s} - s when queue? == false -> + idle_interval = Keyword.get(opts, :idle_interval, @idle_interval) + now = System.monotonic_time(:milliseconds) + codel = %{target: target, interval: interval, delay: 0, slow: false, + next: now, poll: nil, idle_interval: idle_interval, idle: nil} + codel = start_idle(now, now, start_poll(now, now, codel)) + {:ok, {:busy, queue, codel}} + end + + def handle_call({:checkout, now, queue?}, from, {:busy, queue, _} = busy) do + case queue? do + true -> + {pid, _} = from + mon = Process.monitor(pid) + :ets.insert(queue, {{now, System.unique_integer()}, from, mon}) + {:noreply, busy} + false -> message = "connection not available and queuing is disabled" err = DBConnection.ConnectionError.exception(message) - {:reply, {:error, err}, s} + {:reply, {:error, err}, busy} end end - def handle_cast({:checkin, time, mon, state}, s) do - case s do - %{client: {_deadline, ^mon}, conn: {:ok, _pool_ref, _mod, _state} = conn} -> - result = dequeue(time, put_elem(conn, 3, state), s) - Process.demonitor(mon, [:flush]) - result - %{} -> - {:noreply, s} + def handle_call({:checkout, _now, _queue?} = checkout, from, ready) do + {:ready, queue, _codel} = ready + case :ets.first(queue) do + {_time, holder} = key -> + checkout_holder(holder, from, queue) and :ets.delete(queue, key) + {:noreply, ready} + :"$end_of_table" -> + handle_call(checkout, from, put_elem(ready, 0, :busy)) end end - def handle_cast({:disconnect, mon, err, state}, s) do - abort(&DBConnection.Connection.disconnect/4, mon, err, state, s) - end - - def handle_cast({:stop, mon, err, state}, s) do - abort(&DBConnection.Connection.stop/4, mon, err, state, s) - end - - def handle_cast({:connected, time, queue}, %{queue: queue, conn: conn} = s) - when conn == nil or elem(conn, 0) == :error do - # TODO: connection process must send the state async so that pool never blocks. - case DBConnection.Connection.checkout(s.pool, [queue: false, timeout: :infinity]) do - {:ok, _pool_ref, _mod, _state} = conn -> - dequeue(time, conn, %{s | conn: conn}) - {:error, _} = error -> - error(queue, error) - {:noreply, %{s | conn: error}} + def handle_info({:"ETS-TRANSFER", holder, pid, queue}, {_, queue, _} = data) do + message = "client #{inspect pid} exited" + err = DBConnection.ConnectionError.exception(message) + disconnect_holder(holder, err) + {:noreply, data} + end + + def handle_info({:"ETS-TRANSFER", holder, _, {msg, queue, extra}}, {_, queue, _} = data) do + case msg do + :checkin -> + handle_checkin(holder, extra, data) + :disconnect -> + disconnect_holder(holder, extra) + {:noreply, data} + :stop -> + stop_holder(holder, extra) + {:noreply, data} end end - def handle_info({:DOWN, mon, _, pid, reason}, s) do - case s do - %{client: {deadline, ^mon}} -> - cancel_deadline(deadline) - message = "client #{inspect pid} exited with: " <> Exception.format_exit(reason) - err = DBConnection.ConnectionError.exception(message) - fail(err, s) - %{} -> - down(mon, s) - end + def handle_info({:DOWN, mon, _, _, _}, {_, queue, _ } = data) do + :ets.match_delete(queue, {:_, {:_, mon}}) + {:noreply, data} end - def handle_info({:timeout, deadline, {pid, sent, time}}, s) do - case s do - %{client: {^deadline, mon}} -> - Process.demonitor(mon, [:flush]) + def handle_info({:timeout, deadline, {queue, holder, pid, len}}, {_, queue, _} = data) do + # Check that timeout refers to current holder (and not previous) + try do + :ets.lookup_element(holder, @holder_key, 3) + rescue + ArgumentError -> + :ok + else + ^deadline -> + :ets.update_element(holder, @holder_key, {3, nil}) message = "client #{inspect pid} timed out because " <> - "it queued and checked out the connection for longer than #{time-sent}ms" + "it queued and checked out the connection for longer than #{len}ms" err = DBConnection.ConnectionError.exception(message) - fail(err, s) - %{} -> - timeout(deadline, sent, time, s) + disconnect_holder(holder, err) + _ -> + :ok end + {:noreply, data} end - defp dequeue(time, conn, s) do - case s do - %{next: next, delay: delay, target: target} when time > next -> - dequeue_first(time, delay > target, conn, s) - %{slow: false, queue: queue, delay: delay} -> - dequeue_fast(time, queue, delay, conn, s) - %{slow: true, queue: queue, delay: delay, target: target} -> - dequeue_slow(time, queue, delay, target * 2, conn, s) + def handle_info({:timeout, poll, {time, last_sent}}, {_, _, %{poll: poll}} = data) do + {status, queue, codel} = data + # If no queue progress since last poll check queue + case :ets.first(queue) do + {sent, _} when sent <= last_sent and status == :busy -> + delay = time - sent + timeout(delay, time, queue, start_poll(time, sent, codel)) + {sent, _} -> + {:noreply, {status, queue, start_poll(time, sent, codel)}} + :"$end_of_table" -> + {:noreply, {status, queue, start_poll(time, time, codel)}} end end - defp dequeue_first(time, slow?, conn, s) do - %{queue: queue, interval: interval} = s - next = time + interval + def handle_info({:timeout, idle, {time, last_sent}}, {_, _, %{idle: idle}} = data) do + {status, queue, codel} = data + # If no queue progress since last idle check oldest connection case :ets.first(queue) do - {sent, from} = key -> - client = go(queue, key, from, conn) - delay = time - sent - {:noreply, %{s | next: next, delay: delay, slow: slow?, client: client, - conn: conn}} - :"$end_of_table" -> - {:noreply, %{s | next: next, delay: 0, slow: slow?, client: nil, - conn: conn}} + {sent, _} = key when sent <= last_sent and status == :ready -> + ping(key, queue, start_idle(time, last_sent, codel)) + {sent, _} -> + {:noreply, {:ready, queue, start_idle(time, sent, codel)}} + :"$end_of_table" -> + {:noreply, {status, queue, start_idle(time, time, codel)}} end end - defp dequeue_fast(time, queue, delay, conn, s) do - case :ets.first(queue) do - {sent, from} = key -> - delay = min(time - sent, delay) - client = go(queue, key, from, conn) - {:noreply, %{s | delay: delay, client: client, conn: conn}} - :"$end_of_table" -> - {:noreply, %{s | delay: 0, client: nil, conn: conn}} - end + defp timeout(delay, time, queue, codel) do + case codel do + %{delay: min_delay, next: next, target: target, interval: interval} + when time >= next and min_delay > target -> + codel = %{codel | slow: true, delay: delay, next: time + interval} + drop_slow(time, target * 2, queue) + {:noreply, {:busy, queue, codel}} + %{next: next, interval: interval} when time >= next -> + codel = %{codel | slow: false, delay: delay, next: time + interval} + {:noreply, {:busy, queue, codel}} + _ -> + {:noreply, {:busy, queue, codel}} + end end - defp dequeue_slow(time, queue, min_delay, timeout, conn, s) do - with {sent, from} = key <- :ets.first(queue) do - case time - sent do - delay when delay > timeout -> - drop(queue, key, from, delay) - dequeue_slow(time, queue, min(delay, min_delay), timeout, conn, s) - delay -> - client = go(queue, key, from, conn) - {:noreply, %{s | delay: min(delay, min_delay), client: client, - conn: conn}} - end - else - :"$end_of_table" -> - {:noreply, %{s | delay: 0, client: nil, conn: conn}} + defp drop_slow(time, timeout, queue) do + min_sent = time - timeout + match = {{:"$1", :_}, :"$2", :"$3"} + guards = [{:<, :"$1", min_sent}] + select_slow = [{match, guards, [{{:"$1", :"$2", :"$3"}}]}] + for {sent, from, mon} <- :ets.select(queue, select_slow) do + drop(time, from, mon, sent) end + :ets.select_delete(queue, [{match, guards, [true]}]) end - defp go(queue, key, from, conn) do - [{_, client}] = :ets.take(queue, key) - GenServer.reply(from, put_elem(conn, 1, {self(), client})) - client + defp ping({_, holder} = key, queue, codel) do + [{_, conn, _, _, state}] = :ets.lookup(holder, @holder_key) + DBConnection.Connection.ping({conn, holder}, state) + :ets.delete(holder) + :ets.delete(queue, key) + {:noreply, {:ready, queue, codel}} end - defp drop(queue, key, from, sojourn) do - message = "connection not available " <> - "and request was dropped from queue after #{sojourn}ms" - err = DBConnection.ConnectionError.exception(message) - error(queue, key, from, err) + defp handle_checkin(holder, now, {:ready, queue, _} = data) do + :ets.insert(queue, {{now, holder}}) + {:noreply, data} end - defp error(queue, key, from, err) do - [{_, {deadline, mon}}] = :ets.take(queue, key) - GenServer.reply(from, {:error, err}) - Process.demonitor(mon, [:flush]) - cancel_deadline(deadline) + defp handle_checkin(holder, now, {:busy, queue, codel}) do + dequeue(now, holder, queue, codel) + end + + defp dequeue(time, holder, queue, codel) do + case codel do + %{next: next, delay: delay, target: target} when time >= next -> + dequeue_first(time, delay > target, holder, queue, codel) + %{slow: false} -> + dequeue_fast(time, holder, queue, codel) + %{slow: true, target: target} -> + dequeue_slow(time, target * 2, holder, queue, codel) + end end - defp error(queue, err) do + defp dequeue_first(time, slow?, holder, queue, codel) do + %{interval: interval} = codel + next = time + interval case :ets.first(queue) do - {_, from} = key -> - error(queue, key, from, err) + {sent, _} = key -> + delay = time - sent + codel = %{codel | next: next, delay: delay, slow: slow?} + pop(key, delay, time, holder, queue, codel) :"$end_of_table" -> - :ok + codel = %{codel | next: next, delay: 0, slow: slow?} + :ets.insert(queue, {{time, holder}}) + {:noreply, {:ready, queue, codel}} end end - defp abort(fun, mon, err, state, s) do - case s do - %{client: {_deadline, ^mon}, conn: {:ok, pool_ref, _, _}} -> - Process.demonitor(mon, [:flush]) - fun.(pool_ref, err, state, []) - {:noreply, %{s | client: nil, conn: nil}} - %{} -> - {:noreply, s} - end + defp dequeue_fast(time, holder, queue, codel) do + case :ets.first(queue) do + {sent, _} = key -> + pop(key, time - sent, time, holder, queue, codel) + :"$end_of_table" -> + :ets.insert(queue, {{time, holder}}) + {:noreply, {:ready, queue, %{codel | delay: 0}}} + end end - defp fail(err, %{conn: {:ok, pool_ref, _, state}} = s) do - DBConnection.Connection.disconnect(pool_ref, err, state, []) - {:noreply, %{s | client: nil, conn: nil}} + defp dequeue_slow(time, timeout, holder, queue, codel) do + case :ets.first(queue) do + {sent, _} = key when time - sent > timeout -> + [{_, from, mon}] = :ets.take(queue, key) + drop(time, from, mon, sent) + dequeue_slow(time, timeout, holder, queue, codel) + {sent, _} = key -> + pop(key, time - sent, time, holder, queue, codel) + :"$end_of_table" -> + :ets.insert(queue, {{time, holder}}) + {:noreply, {:ready, queue, %{codel | delay: 0}}} + end end - defp down(mon, %{queue: queue} = s) do - case :ets.match_object(queue, {:_, {:_, mon}}, 1) do - {[{key, {deadline, _}}], _cont} -> - cancel_deadline(deadline) - :ets.delete(queue, key) - {:noreply, s} - :"$end_of_table"-> - {:noreply, s} + defp pop(key, delay, time, holder, queue, %{delay: min} = codel) do + [{_, from, mon}] = :ets.take(queue, key) + case Process.demonitor(mon, [:flush, :info]) and checkout_holder(holder, from, queue) do + true when delay < min -> + {:noreply, {:busy, queue, %{codel | delay: delay}}} + true -> + {:noreply, {:busy, queue, codel}} + false -> + dequeue(time, holder, queue, codel) end end - defp timeout(deadline, sent, time, %{queue: queue} = s) do - case :ets.match_object(queue, {{sent, :_}, {deadline, :_}}) do - [{{_, from} = key, {_, mon}}] -> - message = "connection not available " <> - "and request was dropped from queue after #{time-sent}ms" - err = DBConnection.ConnectionError.exception(message) - GenServer.reply(from, {:error, err}) - Process.demonitor(mon, [:flush]) - :ets.delete(queue, key) - {:noreply, s} - [] -> - {:noreply, s} - end + defp drop(time, from, mon, sent) do + message = "connection not available " <> + "and request was dropped from queue after #{time - sent}ms" + err = DBConnection.ConnectionError.exception(message) + GenServer.reply(from, {:error, err}) + Process.demonitor(mon, [:flush]) end defp start_opts(opts) do Keyword.take(opts, [:name, :spawn_opt]) end - defp start_deadline(pid, now, opts) do - case abs_timeout(now, opts) do - nil -> - nil - timeout -> - :erlang.start_timer(timeout, pid, {self(), now, timeout}, [abs: true]) - end - end - defp abs_timeout(now, opts) do case Keyword.get(opts, :timeout, @timeout) do :infinity -> @@ -307,35 +308,102 @@ defmodule DBConnection.ConnectionPool do end end + defp start_deadline(nil, _, _, _, _) do + nil + end + defp start_deadline(timeout, pid, ref, holder, start) do + deadline = :erlang.start_timer(timeout, pid, {ref, holder, self(), timeout-start}, [abs: true]) + :ets.update_element(holder, @holder_key, {3, deadline}) + deadline + end + + defp cancel_deadline(nil) do + :ok + end + defp cancel_deadline(deadline) do :erlang.cancel_timer(deadline, [async: true, info: false]) end - defp pool_opts(opts, ref) do - # TODO: Use a pool of size > 1 - opts - |> after_connect_hook(ref) - |> Keyword.put(:pool, DBConnection.Connection) - |> Keyword.put(:sync_connect, false) - |> Keyword.put(:idle, :passive) + defp start_poll(now, last_sent, %{interval: interval} = codel) do + timeout = now + interval + poll = :erlang.start_timer(timeout, self(), {timeout, last_sent}, [abs: true]) + %{codel | poll: poll} + end + + defp start_idle(now, last_sent, %{idle_interval: interval} = codel) do + timeout = now + interval + idle = :erlang.start_timer(timeout, self(), {timeout, last_sent}, [abs: true]) + %{codel | idle: idle} + end + + defp start_holder(pool, ref, mod, state) do + # Insert before setting heir so that pool can't receive empty table + holder = :ets.new(__MODULE__.Holder, [:public, :ordered_set]) + :true = :ets.insert_new(holder, {@holder_key, self(), nil, mod, state}) + :ets.setopts(holder, {:heir, pool, ref}) + holder + end + + defp checkout_holder(holder, {pid, _} = from, ref) do + try do + :ets.give_away(holder, pid, ref) + GenServer.reply(from, {:ok, holder}) + true + rescue + ArgumentError -> + # Likely the local pid died so won't receive but possible foreign pid + msg = "cannot use connection pool on foreign node #{node()}" + err = DBConnection.ConnectionError.exception(msg) + GenServer.reply(from, {:error, err}) + false + end + end + + defp recv_holder(holder, start, timeout) do + receive do + {:"ETS-TRANSFER", ^holder, pool, ref} -> + deadline = start_deadline(timeout, pool, ref, holder, start) + try do + :ets.lookup(holder, @holder_key) + rescue + ArgumentError -> + # Deadline could hit and by handled pool before using connectoon + msg = "connection not available because deadline reached while in queue" + {:error, DBConnection.ConnectionError.exception(msg)} + else + [{_, _, _, mod, state}] -> + {:ok, {pool, ref, deadline, holder}, mod, state} + end + end end - defp after_connect_hook(opts, ref) do - Keyword.update(opts, :after_connect, - {__MODULE__, :after_connect, [self(), ref]}, &{__MODULE__, :after_connect, [self(), ref, &1]}) + defp checkin_holder(holder, pool, state, msg) do + try do + :ets.update_element(holder, @holder_key, [{3, nil}, {5, state}]) + :ets.give_away(holder, pool, msg) + rescue + ArgumentError -> + :ok + else + true -> + :ok + end end - def after_connect(conn, pool, ref, fun \\ fn _ -> :ok end) do - res = apply_fun(fun, conn) - GenServer.cast(pool, {:connected, System.monotonic_time(:milliseconds), ref}) - res + defp disconnect_holder(holder, err) do + delete_holder(holder, &DBConnection.Connection.disconnect/4, err) end - defp apply_fun(fun, conn) when is_function(fun, 1) do - fun.(conn) + defp stop_holder(holder, err) do + delete_holder(holder, &DBConnection.Connection.stop/4, err) end - defp apply_fun({mod, fun, args}, conn) do - apply(mod, fun, [conn | args]) + + defp delete_holder(holder, stop, err) do + [{_, conn, deadline, _, state}] = :ets.lookup(holder, @holder_key) + :ets.delete(holder) + cancel_deadline(deadline) + stop.({conn, holder}, err, state, []) end end \ No newline at end of file From 405344248d76da66d9e4c58d3deb7d3913cecd49 Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 10 Dec 2017 15:34:51 -0800 Subject: [PATCH 3/9] Fix incorrect status change --- lib/db_connection/connection_pool.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index b031c67..3669c35 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -176,7 +176,7 @@ defmodule DBConnection.ConnectionPool do {sent, _} = key when sent <= last_sent and status == :ready -> ping(key, queue, start_idle(time, last_sent, codel)) {sent, _} -> - {:noreply, {:ready, queue, start_idle(time, sent, codel)}} + {:noreply, {status, queue, start_idle(time, sent, codel)}} :"$end_of_table" -> {:noreply, {status, queue, start_idle(time, time, codel)}} end From c27e88bdbcc43dae08c8e6b9a3e052c0c7826bcf Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 10 Dec 2017 15:57:05 -0800 Subject: [PATCH 4/9] Add the missing modules --- lib/db_connection/connection_pool/pool.ex | 17 +++++++++++++++++ .../connection_pool/pool_supervisor.ex | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 lib/db_connection/connection_pool/pool.ex create mode 100644 lib/db_connection/connection_pool/pool_supervisor.ex diff --git a/lib/db_connection/connection_pool/pool.ex b/lib/db_connection/connection_pool/pool.ex new file mode 100644 index 0000000..cc01a72 --- /dev/null +++ b/lib/db_connection/connection_pool/pool.ex @@ -0,0 +1,17 @@ +defmodule DBConnection.ConnectionPool.Pool do + @moduledoc false + + def start_link(owner, tag, mod, opts) do + size = Keyword.get(opts, :pool_size, 20) + children = for id <- 1..size, do: conn(owner, tag, id, mod, opts) + sup_opts = [strategy: :one_for_one] ++ Keyword.take(opts, [:max_restarts, :max_seconds]) + Supervisor.start_link(children, sup_opts) + end + + ## Helpers + + defp conn(owner, tag, id, mod, opts) do + child_opts = [id: {mod, owner, id}] ++ Keyword.take(opts, [:shutdown]) + DBConnection.Connection.child_spec(mod, opts, :connection_pool, {owner, tag}, child_opts) + end +end diff --git a/lib/db_connection/connection_pool/pool_supervisor.ex b/lib/db_connection/connection_pool/pool_supervisor.ex new file mode 100644 index 0000000..cdae617 --- /dev/null +++ b/lib/db_connection/connection_pool/pool_supervisor.ex @@ -0,0 +1,16 @@ +defmodule DBConnection.ConnectionPool.PoolSupervisor do + @moduledoc false + + alias DBConnection.ConnectionPool.Pool + import Supervisor.Spec + + def start_link() do + pool = supervisor(Pool, [], [restart: :temporary]) + sup_opts = [strategy: :simple_one_for_one, name: __MODULE__] + Supervisor.start_link([pool], sup_opts) + end + + def start_pool(tag, mod, opts) do + DBConnection.Watcher.watch(__MODULE__, [self(), tag, mod, opts]) + end +end From 2817ab4cebe30482b321a14b8806bc2d23aba5cd Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 10 Dec 2017 16:00:19 -0800 Subject: [PATCH 5/9] Fix time unit on pre 1.3 --- lib/db_connection/connection_pool.ex | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 3669c35..bf21e56 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -9,6 +9,7 @@ defmodule DBConnection.ConnectionPool do @queue_target 50 @queue_interval 1000 @idle_interval 1000 + @time_unit 1000 @holder_key :__info__ ## DBConnection.Pool API @@ -31,7 +32,7 @@ defmodule DBConnection.ConnectionPool do @doc false def checkout(pool, opts) do queue? = Keyword.get(opts, :queue, @queue) - now = System.monotonic_time(:milliseconds) + now = System.monotonic_time(@time_unit) timeout = abs_timeout(now, opts) case GenServer.call(pool, {:checkout, now, queue?}, :infinity) do {:ok, holder} -> @@ -44,7 +45,7 @@ defmodule DBConnection.ConnectionPool do @doc false def checkin({pool, ref, deadline, holder}, conn, _) do cancel_deadline(deadline) - now = System.monotonic_time(:milliseconds) + now = System.monotonic_time(@time_unit) checkin_holder(holder, pool, conn, {:checkin, ref, now}) end @@ -65,7 +66,7 @@ defmodule DBConnection.ConnectionPool do @doc false def update(pool, ref, mod, state) do holder = start_holder(pool, ref, mod, state) - now = System.monotonic_time(:milliseconds) + now = System.monotonic_time(@time_unit) checkin_holder(holder, pool, state, {:checkin, ref, now}) holder end @@ -78,7 +79,7 @@ defmodule DBConnection.ConnectionPool do target = Keyword.get(opts, :queue_target, @queue_target) interval = Keyword.get(opts, :queue_interval, @queue_interval) idle_interval = Keyword.get(opts, :idle_interval, @idle_interval) - now = System.monotonic_time(:milliseconds) + now = System.monotonic_time(@time_unit) codel = %{target: target, interval: interval, delay: 0, slow: false, next: now, poll: nil, idle_interval: idle_interval, idle: nil} codel = start_idle(now, now, start_poll(now, now, codel)) From cb408cb728cc573e8a436b84ee40d89661984138 Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 11 Feb 2018 17:04:57 -0800 Subject: [PATCH 6/9] Skip monitoring in ConnectionPool Not required to monitor as give away and heir handles tracking caller --- lib/db_connection/connection_pool.ex | 60 +++++++++++++--------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index bf21e56..5b0f2d9 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -89,9 +89,7 @@ defmodule DBConnection.ConnectionPool do def handle_call({:checkout, now, queue?}, from, {:busy, queue, _} = busy) do case queue? do true -> - {pid, _} = from - mon = Process.monitor(pid) - :ets.insert(queue, {{now, System.unique_integer()}, from, mon}) + :ets.insert(queue, {{now, System.unique_integer(), from}}) {:noreply, busy} false -> message = "connection not available and queuing is disabled" @@ -131,11 +129,6 @@ defmodule DBConnection.ConnectionPool do end end - def handle_info({:DOWN, mon, _, _, _}, {_, queue, _ } = data) do - :ets.match_delete(queue, {:_, {:_, mon}}) - {:noreply, data} - end - def handle_info({:timeout, deadline, {queue, holder, pid, len}}, {_, queue, _} = data) do # Check that timeout refers to current holder (and not previous) try do @@ -160,10 +153,10 @@ defmodule DBConnection.ConnectionPool do {status, queue, codel} = data # If no queue progress since last poll check queue case :ets.first(queue) do - {sent, _} when sent <= last_sent and status == :busy -> + {sent, _, _} when sent <= last_sent and status == :busy -> delay = time - sent timeout(delay, time, queue, start_poll(time, sent, codel)) - {sent, _} -> + {sent, _, _} -> {:noreply, {status, queue, start_poll(time, sent, codel)}} :"$end_of_table" -> {:noreply, {status, queue, start_poll(time, time, codel)}} @@ -174,8 +167,9 @@ defmodule DBConnection.ConnectionPool do {status, queue, codel} = data # If no queue progress since last idle check oldest connection case :ets.first(queue) do - {sent, _} = key when sent <= last_sent and status == :ready -> - ping(key, queue, start_idle(time, last_sent, codel)) + {sent, holder} = key when sent <= last_sent and status == :ready -> + :ets.delete(queue, key) + ping(holder, queue, start_idle(time, last_sent, codel)) {sent, _} -> {:noreply, {status, queue, start_idle(time, sent, codel)}} :"$end_of_table" -> @@ -200,20 +194,19 @@ defmodule DBConnection.ConnectionPool do defp drop_slow(time, timeout, queue) do min_sent = time - timeout - match = {{:"$1", :_}, :"$2", :"$3"} + match = {{:"$1", :_, :"$2"}} guards = [{:<, :"$1", min_sent}] - select_slow = [{match, guards, [{{:"$1", :"$2", :"$3"}}]}] - for {sent, from, mon} <- :ets.select(queue, select_slow) do - drop(time, from, mon, sent) + select_slow = [{match, guards, [{{:"$1", :"$2"}}]}] + for {sent, from} <- :ets.select(queue, select_slow) do + drop(time - sent, from) end :ets.select_delete(queue, [{match, guards, [true]}]) end - defp ping({_, holder} = key, queue, codel) do + defp ping(holder, queue, codel) do [{_, conn, _, _, state}] = :ets.lookup(holder, @holder_key) DBConnection.Connection.ping({conn, holder}, state) :ets.delete(holder) - :ets.delete(queue, key) {:noreply, {:ready, queue, codel}} end @@ -241,10 +234,11 @@ defmodule DBConnection.ConnectionPool do %{interval: interval} = codel next = time + interval case :ets.first(queue) do - {sent, _} = key -> + {sent, _, from} = key -> + :ets.delete(queue, key) delay = time - sent codel = %{codel | next: next, delay: delay, slow: slow?} - pop(key, delay, time, holder, queue, codel) + go(delay, from, time, holder, queue, codel) :"$end_of_table" -> codel = %{codel | next: next, delay: 0, slow: slow?} :ets.insert(queue, {{time, holder}}) @@ -254,8 +248,9 @@ defmodule DBConnection.ConnectionPool do defp dequeue_fast(time, holder, queue, codel) do case :ets.first(queue) do - {sent, _} = key -> - pop(key, time - sent, time, holder, queue, codel) + {sent, _, from} = key -> + :ets.delete(queue, key) + go(time - sent, from, time, holder, queue, codel) :"$end_of_table" -> :ets.insert(queue, {{time, holder}}) {:noreply, {:ready, queue, %{codel | delay: 0}}} @@ -264,21 +259,21 @@ defmodule DBConnection.ConnectionPool do defp dequeue_slow(time, timeout, holder, queue, codel) do case :ets.first(queue) do - {sent, _} = key when time - sent > timeout -> - [{_, from, mon}] = :ets.take(queue, key) - drop(time, from, mon, sent) + {sent, _, from} = key when time - sent > timeout -> + :ets.delete(queue, key) + drop(time - sent, from) dequeue_slow(time, timeout, holder, queue, codel) - {sent, _} = key -> - pop(key, time - sent, time, holder, queue, codel) + {sent, _, from} = key -> + :ets.delete(queue, key) + go(time - sent, from, time, holder, queue, codel) :"$end_of_table" -> :ets.insert(queue, {{time, holder}}) {:noreply, {:ready, queue, %{codel | delay: 0}}} end end - defp pop(key, delay, time, holder, queue, %{delay: min} = codel) do - [{_, from, mon}] = :ets.take(queue, key) - case Process.demonitor(mon, [:flush, :info]) and checkout_holder(holder, from, queue) do + defp go(delay, from, time, holder, queue, %{delay: min} = codel) do + case checkout_holder(holder, from, queue) do true when delay < min -> {:noreply, {:busy, queue, %{codel | delay: delay}}} true -> @@ -288,12 +283,11 @@ defmodule DBConnection.ConnectionPool do end end - defp drop(time, from, mon, sent) do + defp drop(delay, from) do message = "connection not available " <> - "and request was dropped from queue after #{time - sent}ms" + "and request was dropped from queue after #{delay}ms" err = DBConnection.ConnectionError.exception(message) GenServer.reply(from, {:error, err}) - Process.demonitor(mon, [:flush]) end defp start_opts(opts) do From 668b0c99eb6d8d69449cd3c63f16b1d8ec1d9df5 Mon Sep 17 00:00:00 2001 From: James Fish Date: Sun, 11 Feb 2018 23:11:58 -0800 Subject: [PATCH 7/9] Handle poll/idle when ready/busy queue active --- lib/db_connection/connection_pool.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 5b0f2d9..a700fb4 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -158,7 +158,7 @@ defmodule DBConnection.ConnectionPool do timeout(delay, time, queue, start_poll(time, sent, codel)) {sent, _, _} -> {:noreply, {status, queue, start_poll(time, sent, codel)}} - :"$end_of_table" -> + _ -> {:noreply, {status, queue, start_poll(time, time, codel)}} end end @@ -172,7 +172,7 @@ defmodule DBConnection.ConnectionPool do ping(holder, queue, start_idle(time, last_sent, codel)) {sent, _} -> {:noreply, {status, queue, start_idle(time, sent, codel)}} - :"$end_of_table" -> + _ -> {:noreply, {status, queue, start_idle(time, time, codel)}} end end From ccc8548405feffb99260b330f911456b09b97aa5 Mon Sep 17 00:00:00 2001 From: James Fish Date: Tue, 24 Apr 2018 22:24:06 -0700 Subject: [PATCH 8/9] Switch to raw message in connection pool --- lib/db_connection/connection_pool.ex | 81 +++++++++++++++++----------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index a700fb4..37d7411 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -34,11 +34,26 @@ defmodule DBConnection.ConnectionPool do queue? = Keyword.get(opts, :queue, @queue) now = System.monotonic_time(@time_unit) timeout = abs_timeout(now, opts) - case GenServer.call(pool, {:checkout, now, queue?}, :infinity) do - {:ok, holder} -> - recv_holder(holder, now, timeout) - {:error, _err} = error -> + with {:ok, pid} <- whereis_pool(pool), + {:ok, owner, ref, holder} <- checkout(pid, queue?, now) do + deadline = start_deadline(timeout, owner, ref, holder, now) + try do + :ets.lookup(holder, @holder_key) + rescue + ArgumentError -> + # Deadline could hit and by handled pool before using connection + msg = "connection not available because deadline reached while in queue" + {:error, DBConnection.ConnectionError.exception(msg)} + else + [{_, _, _, mod, state}] -> + pool_ref = {owner, ref, deadline, holder} + {:ok, pool_ref, mod, state} + end + else + {:error, _} = error -> error + {:exit, reason} -> + exit({reason, {__MODULE__, :checkout, [pool, opts]}}) end end @@ -86,7 +101,7 @@ defmodule DBConnection.ConnectionPool do {:ok, {:busy, queue, codel}} end - def handle_call({:checkout, now, queue?}, from, {:busy, queue, _} = busy) do + def handle_info({:db_connection, from, {:checkout, now, queue?}}, {:busy, queue, _} = busy) do case queue? do true -> :ets.insert(queue, {{now, System.unique_integer(), from}}) @@ -94,18 +109,19 @@ defmodule DBConnection.ConnectionPool do false -> message = "connection not available and queuing is disabled" err = DBConnection.ConnectionError.exception(message) - {:reply, {:error, err}, busy} + GenServer.reply(from, {:error, err}) + {:noreply, busy} end end - def handle_call({:checkout, _now, _queue?} = checkout, from, ready) do + def handle_info({:db_connection, from, {:checkout, _now, _queue?}} = checkout, ready) do {:ready, queue, _codel} = ready case :ets.first(queue) do {_time, holder} = key -> checkout_holder(holder, from, queue) and :ets.delete(queue, key) {:noreply, ready} :"$end_of_table" -> - handle_call(checkout, from, put_elem(ready, 0, :busy)) + handle_info(checkout, put_elem(ready, 0, :busy)) end end @@ -340,36 +356,41 @@ defmodule DBConnection.ConnectionPool do holder end - defp checkout_holder(holder, {pid, _} = from, ref) do + defp checkout_holder(holder, {pid, mref}, ref) do try do - :ets.give_away(holder, pid, ref) - GenServer.reply(from, {:ok, holder}) - true + :ets.give_away(holder, pid, {mref, ref}) rescue ArgumentError -> - # Likely the local pid died so won't receive but possible foreign pid - msg = "cannot use connection pool on foreign node #{node()}" - err = DBConnection.ConnectionError.exception(msg) - GenServer.reply(from, {:error, err}) + # pid is not alive false end end - defp recv_holder(holder, start, timeout) do + defp whereis_pool(pool) do + case GenServer.whereis(pool) do + pid when node(pid) == node() -> + {:ok, pid} + pid when node(pid) != node() -> + {:exit, {:badnode, node(pid)}} + {_name, node} -> + {:exit, {:badnode, node}} + nil -> + {:exit, :noproc} + end + end + + defp checkout(pid, queue?, start) do + mref = Process.monitor(pid) + send(pid, {:db_connection, {self(), mref}, {:checkout, start, queue?}}) receive do - {:"ETS-TRANSFER", ^holder, pool, ref} -> - deadline = start_deadline(timeout, pool, ref, holder, start) - try do - :ets.lookup(holder, @holder_key) - rescue - ArgumentError -> - # Deadline could hit and by handled pool before using connectoon - msg = "connection not available because deadline reached while in queue" - {:error, DBConnection.ConnectionError.exception(msg)} - else - [{_, _, _, mod, state}] -> - {:ok, {pool, ref, deadline, holder}, mod, state} - end + {:"ETS-TRANSFER", holder, owner, {^mref, ref}} -> + Process.demonitor(mref, [:flush]) + {:ok, owner, ref, holder} + {^mref, reply} -> + Process.demonitor(mref, [:flush]) + reply + {:DOWN, ^mref, _, _, reason} -> + {:exit, reason} end end From b58d49a011517484e35a4ce0c488f7e5c836efb7 Mon Sep 17 00:00:00 2001 From: James Fish Date: Tue, 24 Apr 2018 22:34:12 -0700 Subject: [PATCH 9/9] Support Elixir 1.2 --- lib/db_connection/connection_pool.ex | 43 +++++++++++++++------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/lib/db_connection/connection_pool.ex b/lib/db_connection/connection_pool.ex index 37d7411..0cf069c 100644 --- a/lib/db_connection/connection_pool.ex +++ b/lib/db_connection/connection_pool.ex @@ -34,22 +34,9 @@ defmodule DBConnection.ConnectionPool do queue? = Keyword.get(opts, :queue, @queue) now = System.monotonic_time(@time_unit) timeout = abs_timeout(now, opts) - with {:ok, pid} <- whereis_pool(pool), - {:ok, owner, ref, holder} <- checkout(pid, queue?, now) do - deadline = start_deadline(timeout, owner, ref, holder, now) - try do - :ets.lookup(holder, @holder_key) - rescue - ArgumentError -> - # Deadline could hit and by handled pool before using connection - msg = "connection not available because deadline reached while in queue" - {:error, DBConnection.ConnectionError.exception(msg)} - else - [{_, _, _, mod, state}] -> - pool_ref = {owner, ref, deadline, holder} - {:ok, pool_ref, mod, state} - end - else + case checkout(pool, queue?, now, timeout) do + {:ok, _, _, _} = ok -> + ok {:error, _} = error -> error {:exit, reason} -> @@ -366,10 +353,10 @@ defmodule DBConnection.ConnectionPool do end end - defp whereis_pool(pool) do + defp checkout(pool, queue?, start, timeout) do case GenServer.whereis(pool) do pid when node(pid) == node() -> - {:ok, pid} + checkout_call(pid, queue?, start, timeout) pid when node(pid) != node() -> {:exit, {:badnode, node(pid)}} {_name, node} -> @@ -379,13 +366,15 @@ defmodule DBConnection.ConnectionPool do end end - defp checkout(pid, queue?, start) do + defp checkout_call(pid, queue?, start, timeout) do mref = Process.monitor(pid) send(pid, {:db_connection, {self(), mref}, {:checkout, start, queue?}}) receive do {:"ETS-TRANSFER", holder, owner, {^mref, ref}} -> Process.demonitor(mref, [:flush]) - {:ok, owner, ref, holder} + deadline = start_deadline(timeout, owner, ref, holder, start) + pool_ref = {owner, ref, deadline, holder} + checkout_info(holder, pool_ref) {^mref, reply} -> Process.demonitor(mref, [:flush]) reply @@ -394,6 +383,20 @@ defmodule DBConnection.ConnectionPool do end end + defp checkout_info(holder, pool_ref) do + try do + :ets.lookup(holder, @holder_key) + rescue + ArgumentError -> + # Deadline could hit and by handled pool before using connection + msg = "connection not available because deadline reached while in queue" + {:error, DBConnection.ConnectionError.exception(msg)} + else + [{_, _, _, mod, state}] -> + {:ok, pool_ref, mod, state} + end + end + defp checkin_holder(holder, pool, state, msg) do try do :ets.update_element(holder, @holder_key, [{3, nil}, {5, state}])