Skip to content

Commit

Permalink
Merge pull request #12 from thalesmg/cancel-job
Browse files Browse the repository at this point in the history
Cancel job
  • Loading branch information
thalesmg authored Aug 11, 2022
2 parents 89a6391 + aa11521 commit f265695
Show file tree
Hide file tree
Showing 8 changed files with 599 additions and 22 deletions.
151 changes: 141 additions & 10 deletions lib/turma/decurio.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,24 @@ defmodule Turma.Decurio do
inventory: inventory(),
name: binary()
}
@type selector() ::
:all
| [tag()]
| Regex.t()
| (inventory() -> [peer()])
@type result() ::
:pending
| {:done, term()}
| {:error, term()}
| {:throw, term()}
@type state() :: %{
my_name: binary(),
inventory: inventory(),
router_sock: pid(),
dealer_sock: pid(),
receiver_pid: pid(),
responses: %{job_id() => %{peer() => result()}}
}

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
Expand Down Expand Up @@ -72,6 +85,12 @@ defmodule Turma.Decurio do
GenServer.call(__MODULE__, :get_all)
end

@spec cancel(job_id()) :: :ok
@spec cancel(job_id(), selector()) :: :ok
def cancel(id, selector \\ :all) do
GenServer.call(__MODULE__, {:cancel, id, to_selector(selector)})
end

@spec set_inventory(inventory()) :: :ok
def set_inventory(inventory = %{}) do
GenServer.call(__MODULE__, {:set_inventory, inventory})
Expand Down Expand Up @@ -133,15 +152,11 @@ defmodule Turma.Decurio do

@impl GenServer
def init(opts) do
Process.flag(:trap_exit, true)
inventory = Map.get(opts, :inventory, %{})
my_name = Map.get(opts, :name, "decurio")
{:ok, router_sock} = :chumak.socket(:router)

dealer_sock =
case :chumak.socket(:dealer, to_charlist(my_name)) do
{:ok, dealer_sock} -> dealer_sock
{:error, {:already_started, dealer_sock}} -> dealer_sock
end
{:ok, dealer_sock} = :chumak.socket(:dealer, to_charlist(my_name))

connect_all(inventory, router_sock, dealer_sock)
receiver_pid = spawn_link(__MODULE__, :receiver_loop, [self(), dealer_sock])
Expand Down Expand Up @@ -206,6 +221,35 @@ defmodule Turma.Decurio do
{:reply, res, state}
end

def handle_call({:cancel, id, selector}, _from, state) do
{reply, state} =
case Map.fetch(state.responses, id) do
{:ok, results} ->
%{
cancel_packets: cancel_packets,
canceled_results: canceled_results,
remaining: remaining,
notify_caller?: notify_caller?,
responses: responses
} = cancel_job(id, selector, results, state)

Enum.each(cancel_packets, fn packet ->
:chumak.send_multipart(state.router_sock, packet)
end)

if notify_caller? do
send(remaining[@caller], {:job_finished, id})
end

{canceled_results, %{state | responses: responses}}

:error ->
{nil, state}
end

{:reply, {:ok, reply}, state}
end

def handle_call(_call, _from, state) do
{:reply, {:error, :bad_call}, state}
end
Expand All @@ -225,6 +269,8 @@ defmodule Turma.Decurio do
"response for #{inspect(id)}, #{inspect(responder)}: #{inspect(res, pretty: true)}"
)

# FIXME: race condition: we receive a response for a cancelled
# job.
results =
state.responses
|> Map.fetch!(id)
Expand All @@ -251,6 +297,13 @@ defmodule Turma.Decurio do
{:noreply, state}
end

@impl GenServer
def terminate(_reason, state) do
:chumak.stop(state.dealer_sock)
:chumak.stop(state.router_sock)
state
end

defp connect_all(inventory, router_sock, dealer_sock) do
inventory
|> Map.values()
Expand Down Expand Up @@ -281,27 +334,28 @@ defmodule Turma.Decurio do
end
end

@spec match_peers(inventory(), selector()) :: [peer()]
def match_peers(inventory, :all) do
inventory
|> Map.values()
|> Stream.flat_map(& &1)
|> Enum.dedup()
|> Enum.uniq()
end

def match_peers(inventory, {:tags, tags}) do
inventory
|> Map.take(tags)
|> Map.values()
|> Stream.flat_map(& &1)
|> Enum.dedup()
|> Enum.uniq()
end

def match_peers(inventory, {:regex, regex}) do
inventory
|> Map.values()
|> Stream.flat_map(& &1)
|> Stream.filter(&(&1 =~ regex))
|> Enum.dedup()
|> Enum.uniq()
end

def match_peers(inventory, {:filter, filter}) do
Expand All @@ -313,7 +367,84 @@ defmodule Turma.Decurio do
inventory
|> filter.()
|> Stream.filter(&MapSet.member?(all, &1))
|> Enum.dedup()
|> Enum.uniq()
end

@spec cancel_job(
job_id(),
selector(),
%{peer() => result()},
state()
) :: %{
cancel_packets: [iodata()],
remaining: %{peer() => result()},
notify_caller?: boolean(),
responses: %{job_id() => %{peer() => result()}}
}
def cancel_job(job_id, selector, results, state) do
canceled_results =
results
|> clean_results()
|> Map.take(match_peers(state.inventory, selector))

matched = Map.keys(canceled_results)

cancel_packets =
Enum.map(canceled_results, fn {peer, _} ->
[peer, @prefix, :erlang.term_to_binary({:cancel, job_id, state.my_name})]
end)

remaining =
Enum.reduce(matched, results, fn peer, acc ->
acc
|> Map.update!(@expected, &(&1 - 1))
|> Map.update!(@returned, fn x ->
case acc[peer] do
:pending -> x
_ -> x - 1
end
end)
|> Map.delete(peer)
end)

rem_exp = Map.fetch!(remaining, @expected)
rem_ret = Map.fetch!(remaining, @returned)
notify_caller? = rem_exp > 0 and rem_exp == rem_ret

responses =
if rem_exp == 0 do
Map.delete(state.responses, job_id)
else
Map.put(state.responses, job_id, remaining)
end

%{
cancel_packets: cancel_packets,
canceled_results: canceled_results,
remaining: remaining,
notify_caller?: notify_caller?,
responses: responses
}
end

defp to_selector(:all) do
:all
end

defp to_selector("" <> tag) do
to_selector([tag])
end

defp to_selector(tags) when is_list(tags) do
{:tags, tags}
end

defp to_selector(regex = %Regex{}) do
{:regex, regex}
end

defp to_selector(filter) when is_function(filter, 1) do
{:filter, filter}
end

defp clean_results(results) do
Expand Down
41 changes: 29 additions & 12 deletions lib/turma/legionarius.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ defmodule Turma.Legionarius do

@impl GenServer
def init(opts) do
dealer_sock =
case :chumak.socket(:dealer, to_charlist(opts.id)) do
{:ok, dealer_sock} -> dealer_sock
{:error, {:already_started, dealer_sock}} -> dealer_sock
end

Process.flag(:trap_exit, true)
{:ok, dealer_sock} = :chumak.socket(:dealer, to_charlist(opts.id))
{:ok, router_sock} = :chumak.socket(:router)
{iface, base_port_num} = Map.get(opts, :bind, {"localhost", 9877})
{:ok, _bind_pid0} = :chumak.bind(dealer_sock, :tcp, to_charlist(iface), base_port_num)
Expand All @@ -32,6 +28,7 @@ defmodule Turma.Legionarius do

{:ok,
%{
jobs: %{},
my_id: opts.id,
dealer_sock: dealer_sock,
router_sock: router_sock,
Expand Down Expand Up @@ -62,15 +59,27 @@ defmodule Turma.Legionarius do
is_binary(identity) and
is_reference(id) ->
Logger.info("running something...")
run(id, identity, fun)
:ok
job_pid = run(id, identity, fun)
state = put_in(state, [:jobs, id], %{job_pid: job_pid})
{:noreply, state}

{:cancel, id, decurio_identity} ->
Logger.info("cancelling #{inspect(id)} (#{inspect(decurio_identity)})...")
job_info = state.jobs[id]

try do
Process.exit(job_info[:job_pid], :kill)
rescue
_ -> :ok
end

{_, state} = pop_in(state, [:jobs, id])
{:noreply, state}

x ->
Logger.info("got something weird... #{inspect(x)}")
:ok
{:noreply, state}
end

{:noreply, state}
end

def handle_info({:done, id, decurio_identity, res}, state) do
Expand All @@ -79,6 +88,7 @@ defmodule Turma.Legionarius do
:erlang.term_to_binary({:result, id, state.my_id, res})
])

{_, state} = pop_in(state, [:jobs, id])
{:noreply, state}
end

Expand All @@ -87,6 +97,13 @@ defmodule Turma.Legionarius do
{:noreply, state}
end

@impl GenServer
def terminate(_reason, state) do
:chumak.stop(state.dealer_sock)
:chumak.stop(state.router_sock)
state
end

def receiver_loop(parent, dealer_sock) do
case :chumak.recv_multipart(dealer_sock) do
{:ok, data} ->
Expand All @@ -102,7 +119,7 @@ defmodule Turma.Legionarius do
defp run(id, identity, fun) do
parent = self()

spawn(fn ->
spawn_link(fn ->
res =
try do
{:done, fun.()}
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Turma.MixProject do
defp deps do
[
{:chumak, github: "zeromq/chumak", ref: "f89a873ea2d4b94d3d415228b72cf4f92f076af5"},
{:stream_data, "~> 0.5.0", only: [:test]},
{:dialyxir, "~> 1.2", only: [:dev], runtime: false}
]
end
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
"chumak": {:git, "https://github.com/zeromq/chumak.git", "f89a873ea2d4b94d3d415228b72cf4f92f076af5", [ref: "f89a873ea2d4b94d3d415228b72cf4f92f076af5"]},
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"},
}
Loading

0 comments on commit f265695

Please # to comment.