Skip to content

Commit

Permalink
[manifold] make partitioner faster. (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhgg authored Feb 4, 2018
2 parents 5a0fe68 + c115a17 commit 05c01cd
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 52 deletions.
48 changes: 48 additions & 0 deletions bench/group_by_bench.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule GroupByBench do
use Benchfella

alias Manifold.Utils

setup_all do
pids = for _ <- 0..5000, do: spawn_link &loop/0
{:ok, pids}
end

defp loop() do
receive do
_ -> loop()
end
end

bench "group by 48" do
bench_context
|> Utils.group_by(&:erlang.phash2(&1, 48))
end

bench "partition_pids 48" do
bench_context
|> Utils.partition_pids(48)
end

bench "group by 24" do
bench_context
|> Utils.group_by(&:erlang.phash2(&1, 24))
end

bench "partition_pids 24" do
bench_context
|> Utils.partition_pids(24)
end

bench "group by 8" do
bench_context
|> Utils.group_by(&:erlang.phash2(&1, 8))
end

bench "partition_pids 8" do
bench_context
|> Utils.partition_pids(8)
end


end
48 changes: 48 additions & 0 deletions bench/group_by_one.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule GroupByOneBench do
use Benchfella

alias Manifold.Utils

setup_all do
pids = [spawn_link &loop/0]

{:ok, pids}
end

defp loop() do
receive do
_ -> loop()
end
end


bench "group by 48" do
bench_context
|> Utils.group_by(&:erlang.phash2(&1, 48))
end

bench "partition_pids 48" do
bench_context
|> Utils.partition_pids(48)
end

bench "group by 24" do
bench_context
|> Utils.group_by(&:erlang.phash2(&1, 24))
end

bench "partition_pids 24" do
bench_context
|> Utils.partition_pids(24)
end

bench "group by 8" do
bench_context
|> Utils.group_by(&:erlang.phash2(&1, 8))
end

bench "partition_pids 8" do
bench_context
|> Utils.partition_pids(8)
end
end
77 changes: 77 additions & 0 deletions bench/partitioner_benches.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule WorkerSendBenches do
use Benchfella

alias Manifold.Utils

defmodule Worker do
use GenServer

## Client
@spec start_link :: GenServer.on_start
def start_link, do: GenServer.start_link(__MODULE__, [])

@spec send(pid, [pid], term) :: :ok
def send(pid, pids, message), do: GenServer.cast(pid, {:send, pids, message})

## Server Callbacks
@spec init([]) :: {:ok, nil}
def init([]), do: {:ok, nil}

def handle_cast({:send, _pids, _message}, nil) do
{:noreply, nil}
end

def handle_cast(_message, nil), do: {:noreply, nil}
end


setup_all do
workers = (for _ <- 0..15, do: Worker.start_link() |> elem(1)) |> List.to_tuple
pids = for _ <- 0..200, do: spawn_link &loop/0

pids_by_partition = Utils.partition_pids(pids, tuple_size(workers))
pids_by_partition_map = Utils.group_by(pids, &Utils.partition_for(&1, tuple_size(workers)))

{:ok, {workers, pids_by_partition, pids_by_partition_map}}
end

defp loop() do
receive do
_ -> loop()
end
end

bench "enum reduce send" do
{workers, _, pids_by_partition_map} = bench_context
Enum.reduce(pids_by_partition_map, workers, fn ({partition, pids}, state) ->
{worker_pid, state} = get_worker_pid(partition, state)
Worker.send(worker_pid, pids, :hi)
state
end)
end

bench "do_send send" do
{workers, pids_by_partition, _} = bench_context
do_send(:hi, pids_by_partition, workers, 0, tuple_size(pids_by_partition))
end


defp get_worker_pid(partition, state) do
case elem(state, partition) do
nil ->
{:ok, pid} = Worker.start_link()
{pid, put_elem(state, partition, pid)}
pid ->
{pid, state}
end
end

defp do_send(_message, _pids_by_partition, _workers, partitions, partitions), do: :ok
defp do_send(message, pids_by_partition, workers, partition, partitions) do
pids = elem(pids_by_partition, partition)
if pids != [] do
Worker.send(elem(workers, partition), pids, message)
end
do_send(message, pids_by_partition, workers, partition + 1, partitions)
end
end
77 changes: 77 additions & 0 deletions bench/partitioner_benches_one.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule WorkerSendOneBenches do
use Benchfella

alias Manifold.Utils

defmodule Worker do
use GenServer

## Client
@spec start_link :: GenServer.on_start
def start_link, do: GenServer.start_link(__MODULE__, [])

@spec send(pid, [pid], term) :: :ok
def send(pid, pids, message), do: GenServer.cast(pid, {:send, pids, message})

## Server Callbacks
@spec init([]) :: {:ok, nil}
def init([]), do: {:ok, nil}

def handle_cast({:send, _pids, _message}, nil) do
{:noreply, nil}
end

def handle_cast(_message, nil), do: {:noreply, nil}
end


setup_all do
workers = (for _ <- 0..47, do: Worker.start_link() |> elem(1)) |> List.to_tuple
pids = [spawn_link &loop/0]

pids_by_partition = Utils.partition_pids(pids, tuple_size(workers))
pids_by_partition_map = Utils.group_by(pids, &Utils.partition_for(&1, tuple_size(workers)))

{:ok, {workers, pids_by_partition, pids_by_partition_map}}
end

defp loop() do
receive do
_ -> loop()
end
end

bench "enum reduce send" do
{workers, _, pids_by_partition_map} = bench_context
Enum.reduce(pids_by_partition_map, workers, fn ({partition, pids}, state) ->
{worker_pid, state} = get_worker_pid(partition, state)
Worker.send(worker_pid, pids, :hi)
state
end)
end

bench "do_send send" do
{workers, pids_by_partition, _} = bench_context
do_send(:hi, pids_by_partition, workers, 0, tuple_size(pids_by_partition))
end


defp get_worker_pid(partition, state) do
case elem(state, partition) do
nil ->
{:ok, pid} = Worker.start_link()
{pid, put_elem(state, partition, pid)}
pid ->
{pid, state}
end
end

defp do_send(_message, _pids_by_partition, _workers, partitions, partitions), do: :ok
defp do_send(message, pids_by_partition, workers, partition, partitions) do
pids = elem(pids_by_partition, partition)
if pids != [] do
Worker.send(elem(workers, partition), pids, message)
end
do_send(message, pids_by_partition, workers, partition + 1, partitions)
end
end
36 changes: 36 additions & 0 deletions bench/send_bench.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule SendBench do
use Benchfella

alias Manifold.Utils

setup_all do
pids = for _ <- 0..200, do: spawn_link &loop/0

{:ok, pids}
end

defp loop() do
receive do
_ -> loop()
end
end

bench "send enum each" do
bench_context |> Enum.each(&send(&1, :hi))
end

bench "send list comp" do
for pid <- bench_context, do: send(pid, :hi)
end

bench "send fast reducer" do
send_r(bench_context, :hi)
end

defp send_r([], _msg), do: :ok
defp send_r([pid | pids], msg) do
send(pid, msg)
send_r(pids, msg)
end

end
37 changes: 37 additions & 0 deletions bench/send_bench_one.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule SendBenchOne do
use Benchfella

setup_all do
pid = spawn_link &loop/0
{:ok, pid}
end

defp loop() do
receive do
_ -> loop()
end
end

bench "send enum each" do
[bench_context] |> Enum.each(&send(&1, :hi))
end

bench "send list comp" do
for pid <- [bench_context], do: send(pid, :hi)
end

bench "send one" do
send(bench_context, :hi)
end

bench "send fast reducer" do
send_r([bench_context], :hi)
end

defp send_r([], _msg), do: :ok
defp send_r([pid | pids], msg) do
send(pid, msg)
send_r(pids, msg)
end

end
24 changes: 9 additions & 15 deletions lib/manifold.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ defmodule Manifold do

## Client

@spec send([pid], term) :: :ok
@spec send([pid | nil] | pid | nil, term) :: :ok
def send([pid], message), do: __MODULE__.send(pid, message)
def send(pids, message) when is_list(pids) do
pids
|> Utils.group_by(fn
nil -> nil
pid -> node(pid)
end)
|> Enum.each(fn
{nil, _pids} -> :ok
{node, pids} -> Partitioner.send({Partitioner, node}, pids, message)
end)
end

@spec send(pid, term) :: :ok
def send(pid, message) do
__MODULE__.send([pid], message)
grouped_by = Utils.group_by(pids, fn
nil -> nil
pid -> node(pid)
end)
for {node, pids} <- grouped_by, node != nil, do: Partitioner.send({Partitioner, node}, pids, message)
end
def send(pid, message) when is_pid(pid), do: Partitioner.send({Partitioner, node(pid)}, [pid], message)
def send(nil, _message), do: :ok
end
Loading

0 comments on commit 05c01cd

Please # to comment.