diff --git a/bench/group_by_bench.exs b/bench/group_by_bench.exs new file mode 100644 index 0000000..215e6b0 --- /dev/null +++ b/bench/group_by_bench.exs @@ -0,0 +1,48 @@ +defmodule GroupByBench do + use Benchfella + + alias Manifold.Utils + + setup_all do + pids = for _ <- 0..5000, do: spawn_link &loop/0 + {:ok, pids} + end + + defp loop() do + receive do + _ -> loop() + end + end + + bench "group by 48" do + bench_context + |> Utils.group_by(&:erlang.phash2(&1, 48)) + end + + bench "partition_pids 48" do + bench_context + |> Utils.partition_pids(48) + end + + bench "group by 24" do + bench_context + |> Utils.group_by(&:erlang.phash2(&1, 24)) + end + + bench "partition_pids 24" do + bench_context + |> Utils.partition_pids(24) + end + + bench "group by 8" do + bench_context + |> Utils.group_by(&:erlang.phash2(&1, 8)) + end + + bench "partition_pids 8" do + bench_context + |> Utils.partition_pids(8) + end + + +end \ No newline at end of file diff --git a/bench/group_by_one.exs b/bench/group_by_one.exs new file mode 100644 index 0000000..dee0735 --- /dev/null +++ b/bench/group_by_one.exs @@ -0,0 +1,48 @@ +defmodule GroupByOneBench do + use Benchfella + + alias Manifold.Utils + + setup_all do + pids = [spawn_link &loop/0] + + {:ok, pids} + end + + defp loop() do + receive do + _ -> loop() + end + end + + + bench "group by 48" do + bench_context + |> Utils.group_by(&:erlang.phash2(&1, 48)) + end + + bench "partition_pids 48" do + bench_context + |> Utils.partition_pids(48) + end + + bench "group by 24" do + bench_context + |> Utils.group_by(&:erlang.phash2(&1, 24)) + end + + bench "partition_pids 24" do + bench_context + |> Utils.partition_pids(24) + end + + bench "group by 8" do + bench_context + |> Utils.group_by(&:erlang.phash2(&1, 8)) + end + + bench "partition_pids 8" do + bench_context + |> Utils.partition_pids(8) + end +end \ No newline at end of file diff --git a/bench/partitioner_benches.exs b/bench/partitioner_benches.exs new file mode 100644 index 0000000..b0921ed --- /dev/null +++ b/bench/partitioner_benches.exs @@ -0,0 +1,77 @@ +defmodule WorkerSendBenches do + use Benchfella + + alias Manifold.Utils + + defmodule Worker do + use GenServer + + ## Client + @spec start_link :: GenServer.on_start + def start_link, do: GenServer.start_link(__MODULE__, []) + + @spec send(pid, [pid], term) :: :ok + def send(pid, pids, message), do: GenServer.cast(pid, {:send, pids, message}) + + ## Server Callbacks + @spec init([]) :: {:ok, nil} + def init([]), do: {:ok, nil} + + def handle_cast({:send, _pids, _message}, nil) do + {:noreply, nil} + end + + def handle_cast(_message, nil), do: {:noreply, nil} + end + + + setup_all do + workers = (for _ <- 0..15, do: Worker.start_link() |> elem(1)) |> List.to_tuple + pids = for _ <- 0..200, do: spawn_link &loop/0 + + pids_by_partition = Utils.partition_pids(pids, tuple_size(workers)) + pids_by_partition_map = Utils.group_by(pids, &Utils.partition_for(&1, tuple_size(workers))) + + {:ok, {workers, pids_by_partition, pids_by_partition_map}} + end + + defp loop() do + receive do + _ -> loop() + end + end + + bench "enum reduce send" do + {workers, _, pids_by_partition_map} = bench_context + Enum.reduce(pids_by_partition_map, workers, fn ({partition, pids}, state) -> + {worker_pid, state} = get_worker_pid(partition, state) + Worker.send(worker_pid, pids, :hi) + state + end) + end + + bench "do_send send" do + {workers, pids_by_partition, _} = bench_context + do_send(:hi, pids_by_partition, workers, 0, tuple_size(pids_by_partition)) + end + + + defp get_worker_pid(partition, state) do + case elem(state, partition) do + nil -> + {:ok, pid} = Worker.start_link() + {pid, put_elem(state, partition, pid)} + pid -> + {pid, state} + end + end + + defp do_send(_message, _pids_by_partition, _workers, partitions, partitions), do: :ok + defp do_send(message, pids_by_partition, workers, partition, partitions) do + pids = elem(pids_by_partition, partition) + if pids != [] do + Worker.send(elem(workers, partition), pids, message) + end + do_send(message, pids_by_partition, workers, partition + 1, partitions) + end +end \ No newline at end of file diff --git a/bench/partitioner_benches_one.exs b/bench/partitioner_benches_one.exs new file mode 100644 index 0000000..a4367ef --- /dev/null +++ b/bench/partitioner_benches_one.exs @@ -0,0 +1,77 @@ +defmodule WorkerSendOneBenches do + use Benchfella + + alias Manifold.Utils + + defmodule Worker do + use GenServer + + ## Client + @spec start_link :: GenServer.on_start + def start_link, do: GenServer.start_link(__MODULE__, []) + + @spec send(pid, [pid], term) :: :ok + def send(pid, pids, message), do: GenServer.cast(pid, {:send, pids, message}) + + ## Server Callbacks + @spec init([]) :: {:ok, nil} + def init([]), do: {:ok, nil} + + def handle_cast({:send, _pids, _message}, nil) do + {:noreply, nil} + end + + def handle_cast(_message, nil), do: {:noreply, nil} + end + + + setup_all do + workers = (for _ <- 0..47, do: Worker.start_link() |> elem(1)) |> List.to_tuple + pids = [spawn_link &loop/0] + + pids_by_partition = Utils.partition_pids(pids, tuple_size(workers)) + pids_by_partition_map = Utils.group_by(pids, &Utils.partition_for(&1, tuple_size(workers))) + + {:ok, {workers, pids_by_partition, pids_by_partition_map}} + end + + defp loop() do + receive do + _ -> loop() + end + end + + bench "enum reduce send" do + {workers, _, pids_by_partition_map} = bench_context + Enum.reduce(pids_by_partition_map, workers, fn ({partition, pids}, state) -> + {worker_pid, state} = get_worker_pid(partition, state) + Worker.send(worker_pid, pids, :hi) + state + end) + end + + bench "do_send send" do + {workers, pids_by_partition, _} = bench_context + do_send(:hi, pids_by_partition, workers, 0, tuple_size(pids_by_partition)) + end + + + defp get_worker_pid(partition, state) do + case elem(state, partition) do + nil -> + {:ok, pid} = Worker.start_link() + {pid, put_elem(state, partition, pid)} + pid -> + {pid, state} + end + end + + defp do_send(_message, _pids_by_partition, _workers, partitions, partitions), do: :ok + defp do_send(message, pids_by_partition, workers, partition, partitions) do + pids = elem(pids_by_partition, partition) + if pids != [] do + Worker.send(elem(workers, partition), pids, message) + end + do_send(message, pids_by_partition, workers, partition + 1, partitions) + end +end \ No newline at end of file diff --git a/bench/send_bench.exs b/bench/send_bench.exs new file mode 100644 index 0000000..f9268d8 --- /dev/null +++ b/bench/send_bench.exs @@ -0,0 +1,36 @@ +defmodule SendBench do + use Benchfella + + alias Manifold.Utils + + setup_all do + pids = for _ <- 0..200, do: spawn_link &loop/0 + + {:ok, pids} + end + + defp loop() do + receive do + _ -> loop() + end + end + + bench "send enum each" do + bench_context |> Enum.each(&send(&1, :hi)) + end + + bench "send list comp" do + for pid <- bench_context, do: send(pid, :hi) + end + + bench "send fast reducer" do + send_r(bench_context, :hi) + end + + defp send_r([], _msg), do: :ok + defp send_r([pid | pids], msg) do + send(pid, msg) + send_r(pids, msg) + end + +end \ No newline at end of file diff --git a/bench/send_bench_one.exs b/bench/send_bench_one.exs new file mode 100644 index 0000000..185b03a --- /dev/null +++ b/bench/send_bench_one.exs @@ -0,0 +1,37 @@ +defmodule SendBenchOne do + use Benchfella + + setup_all do + pid = spawn_link &loop/0 + {:ok, pid} + end + + defp loop() do + receive do + _ -> loop() + end + end + + bench "send enum each" do + [bench_context] |> Enum.each(&send(&1, :hi)) + end + + bench "send list comp" do + for pid <- [bench_context], do: send(pid, :hi) + end + + bench "send one" do + send(bench_context, :hi) + end + + bench "send fast reducer" do + send_r([bench_context], :hi) + end + + defp send_r([], _msg), do: :ok + defp send_r([pid | pids], msg) do + send(pid, msg) + send_r(pids, msg) + end + +end \ No newline at end of file diff --git a/lib/manifold.ex b/lib/manifold.ex index 2025e15..24169f0 100644 --- a/lib/manifold.ex +++ b/lib/manifold.ex @@ -23,21 +23,15 @@ defmodule Manifold do ## Client - @spec send([pid], term) :: :ok + @spec send([pid | nil] | pid | nil, term) :: :ok + def send([pid], message), do: __MODULE__.send(pid, message) def send(pids, message) when is_list(pids) do - pids - |> Utils.group_by(fn - nil -> nil - pid -> node(pid) - end) - |> Enum.each(fn - {nil, _pids} -> :ok - {node, pids} -> Partitioner.send({Partitioner, node}, pids, message) - end) - end - - @spec send(pid, term) :: :ok - def send(pid, message) do - __MODULE__.send([pid], message) + grouped_by = Utils.group_by(pids, fn + nil -> nil + pid -> node(pid) + end) + for {node, pids} <- grouped_by, node != nil, do: Partitioner.send({Partitioner, node}, pids, message) end + def send(pid, message) when is_pid(pid), do: Partitioner.send({Partitioner, node(pid)}, [pid], message) + def send(nil, _message), do: :ok end diff --git a/lib/manifold/partitioner.ex b/lib/manifold/partitioner.ex index 06cb926..23199b6 100644 --- a/lib/manifold/partitioner.ex +++ b/lib/manifold/partitioner.ex @@ -31,7 +31,11 @@ defmodule Manifold.Partitioner do # Set optimal process flags Process.flag(:trap_exit, true) Process.flag(:message_queue_data, :off_heap) - {:ok, Tuple.duplicate(nil, partitions)} + workers = for _ <- 0..partitions do + {:ok, pid} = Worker.start_link() + pid + end + {:ok, List.to_tuple(workers)} end def terminate(_reason, _state), do: :ok @@ -42,6 +46,7 @@ defmodule Manifold.Partitioner do end {:reply, children, state} end + def handle_call(:count_children, _from, state) do {:reply, [ specs: 1, @@ -50,20 +55,25 @@ defmodule Manifold.Partitioner do workers: tuple_size(state) ], state} end + def handle_call(_message, _from, state) do {:reply, :error, state} end + # Specialize handling cast to a single pid. + def handle_cast({:send, [pid], message}, state) do + partition = Utils.partition_for(pid, tuple_size(state)) + Worker.send(elem(state, partition), [pid], message) + {:noreply, state} + end + def handle_cast({:send, pids, message}, state) do - state = pids - |> Utils.group_by(&:erlang.phash2(&1, tuple_size(state))) - |> Enum.reduce(state, fn ({partition, pids}, state) -> - {worker_pid, state} = get_worker_pid(partition, state) - Worker.send(worker_pid, pids, message) - state - end) + partitions = tuple_size(state) + pids_by_partition = Utils.partition_pids(pids, partitions) + do_send(message, pids_by_partition, state, 0, partitions) {:noreply, state} end + def handle_cast(_message, state) do {:noreply, state} end @@ -74,26 +84,24 @@ defmodule Manifold.Partitioner do state = state |> Tuple.to_list |> Enum.map(fn - ^pid -> nil + ^pid -> Worker.start_link() pid -> pid end) |> List.to_tuple {:noreply, state} end + def handle_info(_message, state) do {:noreply, state} end - ## Private - - defp get_worker_pid(partition, state) do - case elem(state, partition) do - nil -> - {:ok, pid} = Worker.start_link - {pid, put_elem(state, partition, pid)} - pid -> - {pid, state} + defp do_send(_message, _pids_by_partition, _workers, partitions, partitions), do: :ok + defp do_send(message, pids_by_partition, workers, partition, partitions) do + pids = elem(pids_by_partition, partition) + if pids != [] do + Worker.send(elem(workers, partition), pids, message) end + do_send(message, pids_by_partition, workers, partition + 1, partitions) end end diff --git a/lib/manifold/utils.ex b/lib/manifold/utils.ex index a2db262..941fb24 100644 --- a/lib/manifold/utils.ex +++ b/lib/manifold/utils.ex @@ -6,13 +6,36 @@ defmodule Manifold.Utils do A faster version of Enum.group_by with less bells and whistles. """ @spec group_by([pid], key_fun) :: groups - def group_by(pids, key_fun), do: group_by(pids, key_fun, Map.new) + def group_by(pids, key_fun), do: group_by(pids, key_fun, %{}) @spec group_by([pid], key_fun, groups) :: groups - def group_by([pid|pids], key_fun, groups) do + defp group_by([pid | pids], key_fun, groups) do key = key_fun.(pid) - group = Map.get(groups, key) || [] - group_by(pids, key_fun, Map.put(groups, key, [pid|group])) + group = Map.get(groups, key, []) + group_by(pids, key_fun, Map.put(groups, key, [pid | group])) + end + defp group_by([], _key_fun, groups), do: groups + + @doc """ + Partitions a bunch of pids into a tuple, of lists of pids grouped by by the result of :erlang.pash2/2 + """ + @spec partition_pids([pid], integer) :: tuple + def partition_pids(pids, partitions) do + do_partition_pids(pids, partitions, Tuple.duplicate([], partitions)) + end + + defp do_partition_pids([pid | pids], partitions, pids_by_partition) do + partition = partition_for(pid, partitions) + pids_in_partition = elem(pids_by_partition, partition) + do_partition_pids(pids, partitions, put_elem(pids_by_partition, partition, [pid | pids_in_partition])) + end + defp do_partition_pids([], _partitions, pids_by_partition), do: pids_by_partition + + @doc """ + Computes the partition for a given pid using :erlang.phash2/2 + """ + @spec partition_for(pid, integer) :: integer + def partition_for(pid, partitions) do + :erlang.phash2(pid, partitions) end - def group_by([], _key_fun, groups), do: groups end diff --git a/lib/manifold/worker.ex b/lib/manifold/worker.ex index 1510305..7837cb4 100644 --- a/lib/manifold/worker.ex +++ b/lib/manifold/worker.ex @@ -2,25 +2,25 @@ defmodule Manifold.Worker do use GenServer ## Client - @spec start_link :: GenServer.on_start - def start_link do - GenServer.start_link(__MODULE__, [:ok]) - end + def start_link, do: GenServer.start_link(__MODULE__, []) @spec send(pid, [pid], term) :: :ok - def send(pid, pids, message) do - GenServer.cast(pid, {:send, pids, message}) - end + def send(pid, pids, message), do: GenServer.cast(pid, {:send, pids, message}) ## Server Callbacks + @spec init([]) :: {:ok, nil} + def init([]), do: {:ok, nil} - def handle_cast({:send, pids, message}, state) do - Enum.each(pids, &send(&1, message)) - {:noreply, state} + def handle_cast({:send, [pid], message}, nil) do + send(pid, message) + {:noreply, nil} end - def handle_cast(_message, state) do - {:noreply, state} + def handle_cast({:send, pids, message}, nil) do + for pid <- pids, do: send(pid, message) + {:noreply, nil} end + + def handle_cast(_message, nil), do: {:noreply, nil} end diff --git a/mix.exs b/mix.exs index 9fb19e4..c2f5499 100644 --- a/mix.exs +++ b/mix.exs @@ -21,7 +21,9 @@ defmodule Manifold.Mixfile do end defp deps do - [] + [ + {:benchfella, "~> 0.3.0"} + ] end def package do diff --git a/test/manifold_test.exs b/test/manifold_test.exs index 789c968..c1fa230 100644 --- a/test/manifold_test.exs +++ b/test/manifold_test.exs @@ -6,7 +6,7 @@ defmodule ManifoldTest do me = self() message = :hello pids = for _ <- 0..10000 do - spawn fn -> + spawn_link fn -> receive do message -> send(me, {self(), message}) end @@ -17,4 +17,45 @@ defmodule ManifoldTest do assert_receive {^pid, ^message} end end + + test "send to list of one" do + me = self() + message = :hello + pid = spawn_link fn -> + receive do + message -> send(me, message) + end + end + Manifold.send([pid], message) + assert_receive ^message + end + + test "send to one" do + me = self() + message = :hello + pid = spawn_link fn -> + receive do + message -> send(me, message) + end + end + Manifold.send(pid, message) + assert_receive ^message + end + + test "send to nil" do + assert Manifold.send([nil], :hi) == :ok + assert Manifold.send(nil, :hi) == :ok + end + + test "send with nil in list wont blow up" do + me = self() + message = :hello + pid = spawn_link fn -> + receive do + message -> send(me, message) + end + end + Manifold.send([nil, pid, nil], message) + assert_receive ^message + end end