Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add built in connection pool #108

Merged
merged 9 commits into from
Jun 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_test/cases/idle_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule TestIdle do
end]
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self(), idle_timeout: 50]
opts = [agent: agent, parent: self(), idle_timeout: 50, idle_interval: 50]
{:ok, pool} = P.start_link(opts)
assert_receive {:hi, conn}
assert_receive {:pong, ^conn}
Expand Down
8 changes: 5 additions & 3 deletions integration_test/cases/queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ defmodule QueueTest do
stack = [{:ok, :state}]
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self(), queue_timeout: 50]
opts = [agent: agent, parent: self(), queue_timeout: 50, queue_target: 50,
queue_interval: 50]
{:ok, pool} = P.start_link(opts)

parent = self()
Expand Down Expand Up @@ -147,7 +148,8 @@ defmodule QueueTest do
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self(), backoff_start: 30_000,
queue_timeout: 10, pool_timeout: 10]
queue_timeout: 10, pool_timeout: 10, queue_target: 10,
queue_interval: 10]
{:ok, pool} = P.start_link(opts)

P.run(pool, fn(_) ->
Expand All @@ -165,7 +167,7 @@ defmodule QueueTest do
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self(), backoff_start: 30_000,
queue_timeout: 10, pool_timeout: 10]
queue_timeout: 10, pool_timeout: 10, queue_target: 10, queue_interval: 10]
{:ok, pool} = P.start_link(opts)

P.run(pool, fn(_) ->
Expand Down
1 change: 1 addition & 0 deletions integration_test/connection_pool/all_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Code.require_file "../tests.exs", __DIR__
11 changes: 11 additions & 0 deletions integration_test/connection_pool/test_helper.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ExUnit.start([capture_log: true, assert_receive_timeout: 500,
exclude: [:pool_overflow, :enqueue_disconnected,
:queue_timeout_exit]])

Code.require_file "../../test/test_support.exs", __DIR__

defmodule TestPool do
use TestConnection, [pool: DBConnection.ConnectionPool, pool_size: 1]
end

{:ok, _} = TestPool.ensure_all_started()
1 change: 1 addition & 0 deletions lib/db_connection/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule DBConnection.App do
supervisor(DBConnection.Task, []),
supervisor(DBConnection.Sojourn.Supervisor, []),
supervisor(DBConnection.Ownership.PoolSupervisor, []),
supervisor(DBConnection.ConnectionPool.PoolSupervisor, []),
worker(DBConnection.Watcher, [])
]
Supervisor.start_link(children, strategy: :one_for_all, name: __MODULE__)
Expand Down
58 changes: 47 additions & 11 deletions lib/db_connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule DBConnection.Connection do
use Connection
require Logger
alias DBConnection.Backoff
alias DBConnection.ConnectionPool

@pool_timeout 5_000
@timeout 15_000
Expand Down Expand Up @@ -83,6 +84,11 @@ defmodule DBConnection.Connection do
end
end

@doc false
def ping({pid, ref}, state) do
Connection.cast(pid, {:ping, ref, state})
end

## Internal API

@doc false
Expand All @@ -96,19 +102,26 @@ defmodule DBConnection.Connection do
Supervisor.Spec.worker(__MODULE__, [mod, opts, mode], child_opts)
end

@doc false
def child_spec(mod, opts, mode, info, child_opts) do
Supervisor.Spec.worker(__MODULE__, [mod, opts, mode, info], child_opts)
end

## Connection API

@doc false
def init({mod, opts, mode, info}) do
queue = if mode == :sojourn, do: :broker, else: :queue.new()
queue = if mode in [:connection, :poolboy], do: :queue.new(), else: mode
idle = if mode in [:connection, :poolboy], do: get_idle(opts), else: :passive
broker = if mode == :sojourn, do: elem(info, 0)
regulator = if mode == :sojourn, do: elem(info, 1)
idle = if mode == :sojourn, do: :passive, else: get_idle(opts)
after_timeout = if mode == :poolboy, do: :stop, else: :backoff
pool = if mode == :connection_pool, do: elem(info, 0)
tag = if mode == :connection_pool, do: elem(info, 1)

s = %{mod: mod, opts: opts, state: nil, client: :closed, broker: broker,
regulator: regulator, lock: nil, queue: queue, timer: nil,
backoff: Backoff.new(opts),
regulator: regulator, lock: nil, pool: pool, tag: tag, queue: queue,
timer: nil, backoff: Backoff.new(opts),
after_connect: Keyword.get(opts, :after_connect),
after_connect_timeout: Keyword.get(opts, :after_connect_timeout,
@timeout), idle: idle,
Expand Down Expand Up @@ -191,7 +204,7 @@ defmodule DBConnection.Connection do
@doc false
def handle_call({:checkout, ref, queue?, timeout}, {pid, _} = from, s) do
case s do
%{queue: :broker} ->
%{queue: mode} when is_atom(mode) ->
exit(:bad_checkout)
%{client: nil, idle: :passive, mod: mod, state: state} ->
Connection.reply(from, {:ok, {self(), ref}, mod, state})
Expand Down Expand Up @@ -229,6 +242,16 @@ defmodule DBConnection.Connection do
end

@doc false
def handle_cast({:ping, ref, state}, %{client: {ref, :pool}} = s) do
%{mod: mod} = s
case apply(mod, :ping, [state]) do
{:ok, state} ->
pool_update(state, s)
{:disconnect, err, state} ->
{:disconnect, {:log, err}, %{s | state: state}}
end
end

def handle_cast({:checkin, ref, state}, %{client: {ref, _}} = s) do
handle_next(state, s)
end
Expand All @@ -244,7 +267,7 @@ defmodule DBConnection.Connection do
{:stop, {err, stack}, %{s | state: state}}
end

def handle_cast({:cancel, _}, %{queue: :broker}) do
def handle_cast({:cancel, _}, %{queue: mode}) when is_atom(mode) do
exit(:bad_cancel)
end
def handle_cast({:cancel, ref}, %{client: {ref, _}, state: state} = s) do
Expand Down Expand Up @@ -293,7 +316,9 @@ defmodule DBConnection.Connection do
def handle_cast({:connected, ref}, %{client: {ref, :connect}} = s) do
%{mod: mod, state: state, queue: queue, broker: broker} = s
case apply(mod, :checkout, [state]) do
{:ok, state} when queue == :broker ->
{:ok, state} when queue == :connection_pool ->
pool_update(state, s)
{:ok, state} when queue == :sojourn ->
info = {self(), mod, state}
{:await, ^ref, _} = :sbroker.async_ask_r(broker, info, {self(), ref})
{:noreply, %{s | client: {ref, :broker}, state: state}}
Expand All @@ -320,7 +345,8 @@ defmodule DBConnection.Connection do
err = DBConnection.ConnectionError.exception(message)
{:disconnect, {down_log(reason), err}, %{s | client: {ref, nil}}}
end
def handle_info({:DOWN, _, :process, _, _} = msg, %{queue: :broker} = s) do
def handle_info({:DOWN, _, :process, _, _} = msg, %{queue: mode} = s)
when is_atom(mode) do
do_handle_info(msg, s)
end
def handle_info({:DOWN, ref, :process, _, _} = msg, %{queue: queue} = s) do
Expand Down Expand Up @@ -410,7 +436,7 @@ defmodule DBConnection.Connection do
defp start_opts(:connection, opts) do
Keyword.take(opts, [:debug, :name, :timeout, :spawn_opt])
end
defp start_opts(mode, opts) when mode in [:poolboy, :sojourn] do
defp start_opts(mode, opts) when mode in [:poolboy, :sojourn, :connection_pool] do
Keyword.take(opts, [:debug, :spawn_opt])
end

Expand Down Expand Up @@ -460,7 +486,10 @@ defmodule DBConnection.Connection do
demonitor(client)
handle_next(state, %{s | client: nil, backoff: backoff})
end
defp handle_next(state, %{queue: :broker} = s) do
defp handle_next(state, %{queue: :connection_pool} = s) do
pool_update(state, s)
end
defp handle_next(state, %{queue: :sojourn} = s) do
%{client: client, timer: timer, mod: mod, broker: broker} = s
demonitor(client)
cancel_timer(timer)
Expand Down Expand Up @@ -600,7 +629,9 @@ defmodule DBConnection.Connection do
end
end

defp clear_queue(:broker), do: :broker
defp clear_queue(queue) when is_atom(queue) do
queue
end
defp clear_queue(queue) do
clear =
fn({{_, mon}, _, from}) ->
Expand All @@ -619,6 +650,11 @@ defmodule DBConnection.Connection do
:ok
end

defp pool_update(state, %{pool: pool, tag: tag, mod: mod} = s) do
ref = ConnectionPool.update(pool, tag, mod, state)
{:noreply, %{s | client: {ref, :pool}, state: state}, :hibernate}
end

defp normal_status(mod, pdict, state) do
try do
mod.format_status(:normal, [pdict, state])
Expand Down
Loading