Skip to content

Account for buffered events in dispatchers #312

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/gen_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2040,8 +2040,9 @@ defmodule GenStage do
when is_integer(counter) do
case consumers do
%{^ref => _} ->
%{dispatcher_state: dispatcher_state} = stage
dispatcher_callback(:ask, [counter, from, dispatcher_state], stage)
%{dispatcher_state: dispatcher_state, buffer: buffer} = stage
buffer_size = Buffer.estimate_size(buffer)
dispatcher_callback(:ask, [counter, buffer_size, from, dispatcher_state], stage)

%{} ->
msg = {:"$gen_consumer", {self(), ref}, {:cancel, :unknown_subscription}}
Expand Down
2 changes: 1 addition & 1 deletion lib/gen_stage/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ defmodule GenStage.Dispatcher do
It is guaranteed the reference given in `from` points to a
reference previously given in subscribe.
"""
@callback ask(demand :: pos_integer, from :: {pid, reference}, state :: term) ::
@callback ask(demand :: pos_integer, buffer_size :: pos_integer, from :: {pid, reference}, state :: term) ::
{:ok, actual_demand :: non_neg_integer, new_state}
when new_state: term

Expand Down
2 changes: 1 addition & 1 deletion lib/gen_stage/dispatchers/broadcast_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ defmodule GenStage.BroadcastDispatcher do
end

@doc false
def ask(counter, {pid, ref}, {demands, waiting, subscribed_processes}) do
def ask(counter, _buffer_size, {pid, ref}, {demands, waiting, subscribed_processes}) do
{current, selector, demands} = pop_demand(ref, demands)
demands = add_demand(current + counter, pid, ref, selector, demands)
new_min = get_min(demands)
Expand Down
5 changes: 3 additions & 2 deletions lib/gen_stage/dispatchers/demand_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ defmodule GenStage.DemandDispatcher do
end

@doc false
def ask(counter, {pid, ref}, {demands, pending, max, shuffle_demand}) do
def ask(counter, buffer_size, {pid, ref}, {demands, pending, max, shuffle_demand}) do
max = max || counter

if counter > max do
Expand All @@ -69,7 +69,8 @@ defmodule GenStage.DemandDispatcher do
demands = add_demand(current + counter, pid, ref, demands)

already_sent = min(pending, counter)
{:ok, counter - already_sent, {demands, pending - already_sent, max, shuffle_demand}}
buffered = min(already_sent, buffer_size)
{:ok, counter - already_sent + buffered, {demands, pending - already_sent, max, shuffle_demand}}
end

@doc false
Expand Down
5 changes: 3 additions & 2 deletions lib/gen_stage/dispatchers/partition_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ defmodule GenStage.PartitionDispatcher do
end

@doc false
def ask(counter, {_, ref}, {tag, hash, waiting, pending, partitions, references, infos}) do
def ask(counter, buffer_size, {_, ref}, {tag, hash, waiting, pending, partitions, references, infos}) do
partition = Map.fetch!(references, ref)
{pid, ref, demand_or_queue} = Map.fetch!(partitions, partition)

Expand All @@ -222,9 +222,10 @@ defmodule GenStage.PartitionDispatcher do

partitions = Map.put(partitions, partition, {pid, ref, demand_or_queue})
already_sent = min(pending, counter)
buffered = min(already_sent, buffer_size)
demand = counter - already_sent
pending = pending - already_sent
{:ok, demand, {tag, hash, waiting + demand, pending, partitions, references, infos}}
{:ok, demand + buffered, {tag, hash, waiting + demand, pending, partitions, references, infos}}
end

defp send_from_queue(queue, _tag, pid, ref, _partition, 0, acc, infos) do
Expand Down
30 changes: 15 additions & 15 deletions test/gen_stage/broadcast_dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid, ref}, disp)
{:ok, 10, disp} = D.ask(10, 0, {pid, ref}, disp)
assert disp == {[{0, pid, ref, nil}], 10, expected_subscribers}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
Expand All @@ -49,7 +49,7 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
{:ok, 10, disp} = D.ask(10, 0, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers}

expected_subscribers = MapSet.put(expected_subscribers, pid2)
Expand All @@ -62,7 +62,7 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 0, disp} = D.cancel({pid1, ref1}, disp)
assert disp == {[{0, pid2, ref2, nil}], 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid2, ref2}, disp)
{:ok, 10, disp} = D.ask(10, 0, {pid2, ref2}, disp)
assert disp == {[{0, pid2, ref2, nil}], 10, expected_subscribers}
end

Expand All @@ -83,15 +83,15 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}

{:ok, 0, disp} = D.ask(10, {pid1, ref1}, disp)
{:ok, 0, disp} = D.ask(10, 0, {pid1, ref1}, disp)
assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, expected_subscribers}

expected_subscribers = MapSet.delete(expected_subscribers, pid2)

{:ok, 10, disp} = D.cancel({pid2, ref2}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
{:ok, 10, disp} = D.ask(10, 0, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 20, expected_subscribers}
end

Expand All @@ -107,8 +107,8 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)

{:ok, 0, disp} = D.ask(3, {pid1, ref1}, disp)
{:ok, 2, disp} = D.ask(2, {pid2, ref2}, disp)
{:ok, 0, disp} = D.ask(3, 0, {pid1, ref1}, disp)
{:ok, 2, disp} = D.ask(2, 0, {pid2, ref2}, disp)

expected_subscribers = MapSet.new([pid1, pid2])

Expand All @@ -122,7 +122,7 @@ defmodule GenStage.BroadcastDispatcherTest do
assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b]}

# A batch with left-over
{:ok, 1, disp} = D.ask(2, {pid2, ref2}, disp)
{:ok, 1, disp} = D.ask(2, 0, {pid2, ref2}, disp)

{:ok, [:d], disp} = D.dispatch([:c, :d], 2, disp)
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
Expand All @@ -135,7 +135,7 @@ defmodule GenStage.BroadcastDispatcherTest do
refute_received {:"$gen_consumer", {_, _}, _}

# Add a late subscriber
{:ok, 1, disp} = D.ask(1, {pid1, ref1}, disp)
{:ok, 1, disp} = D.ask(1, 0, {pid1, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid3, ref3}, disp)
{:ok, [:d, :e], disp} = D.dispatch([:d, :e], 2, disp)

Expand All @@ -146,9 +146,9 @@ defmodule GenStage.BroadcastDispatcherTest do
expected_subscribers}

# Even out
{:ok, 0, disp} = D.ask(2, {pid1, ref1}, disp)
{:ok, 0, disp} = D.ask(2, {pid2, ref2}, disp)
{:ok, 3, disp} = D.ask(3, {pid3, ref3}, disp)
{:ok, 0, disp} = D.ask(2, 0, {pid1, ref1}, disp)
{:ok, 0, disp} = D.ask(2, 0, {pid2, ref2}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid3, ref3}, disp)
{:ok, [], disp} = D.dispatch([:d, :e, :f], 3, disp)

assert disp ==
Expand All @@ -173,8 +173,8 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 0, disp} = D.subscribe([selector: selector2], {pid2, ref2}, disp)
assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, _} = disp

{:ok, 0, disp} = D.ask(4, {pid2, ref2}, disp)
{:ok, 4, disp} = D.ask(4, {pid1, ref1}, disp)
{:ok, 0, disp} = D.ask(4, 0, {pid2, ref2}, disp)
{:ok, 4, disp} = D.ask(4, 0, {pid1, ref1}, disp)

events = [%{key: "pref-1234"}, %{key: "pref-5678"}, %{key: "pre0000"}, %{key: "foo0000"}]
{:ok, [], _disp} = D.dispatch(events, 4, disp)
Expand All @@ -197,7 +197,7 @@ defmodule GenStage.BroadcastDispatcherTest do

{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
{:ok, 0, disp} = D.ask(3, {pid1, ref1}, disp)
{:ok, 0, disp} = D.ask(3, 0, {pid1, ref1}, disp)

{:ok, notify_disp} = D.info(:hello, disp)
assert disp == notify_disp
Expand Down
70 changes: 53 additions & 17 deletions test/gen_stage/demand_dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule GenStage.DemandDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref}], 0, nil, @default_shuffle_flag}

{:ok, 10, disp} = D.ask(10, {pid, ref}, disp)
{:ok, 10, disp} = D.ask(10, 0, {pid, ref}, disp)
assert disp == {[{10, pid, ref}], 0, 10, @default_shuffle_flag}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
Expand All @@ -44,7 +44,43 @@ defmodule GenStage.DemandDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref}], 10, 10, @default_shuffle_flag}

{:ok, 0, disp} = D.ask(5, {pid, ref}, disp)
{:ok, 0, disp} = D.ask(5, 0, {pid, ref}, disp)
assert disp == {[{5, pid, ref}], 5, 10, @default_shuffle_flag}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
assert disp == {[], 10, 10, @default_shuffle_flag}
end

test "asks with buffered events" do
pid = self()
ref = make_ref()
disp = dispatcher([])

# Subscribe, ask and cancel and leave some demand
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref}], 0, nil, @default_shuffle_flag}

{:ok, 10, disp} = D.ask(10, 0, {pid, ref}, disp)
assert disp == {[{10, pid, ref}], 0, 10, @default_shuffle_flag}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
assert disp == {[], 10, 10, @default_shuffle_flag}

# Subscribe, ask with a buffer greater than the counter and cancel and leave the same demand
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref}], 10, 10, @default_shuffle_flag}

{:ok, 5, disp} = D.ask(5, 7, {pid, ref}, disp)
assert disp == {[{5, pid, ref}], 5, 10, @default_shuffle_flag}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
assert disp == {[], 10, 10, @default_shuffle_flag}

# Subscribe, ask with a buffer smaller than the counter and cancel and leave the same demand
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref}], 10, 10, @default_shuffle_flag}

{:ok, 3, disp} = D.ask(5, 3, {pid, ref}, disp)
assert disp == {[{5, pid, ref}], 5, 10, @default_shuffle_flag}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
Expand All @@ -57,14 +93,14 @@ defmodule GenStage.DemandDispatcherTest do
disp = dispatcher([])
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)

{:ok, 3, disp} = D.ask(3, {pid, ref}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref}, disp)
assert disp == {[{3, pid, ref}], 0, 3, @default_shuffle_flag}

{:ok, [], disp} = D.dispatch([:a], 1, disp)
assert disp == {[{2, pid, ref}], 0, 3, @default_shuffle_flag}
assert_received {:"$gen_consumer", {_, ^ref}, [:a]}

{:ok, 3, disp} = D.ask(3, {pid, ref}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref}, disp)
assert disp == {[{5, pid, ref}], 0, 3, @default_shuffle_flag}

{:ok, [:g, :h], disp} = D.dispatch([:b, :c, :d, :e, :f, :g, :h], 7, disp)
Expand All @@ -87,15 +123,15 @@ defmodule GenStage.DemandDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref3}, disp)

{:ok, 4, disp} = D.ask(4, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, {pid, ref3}, disp)
{:ok, 4, disp} = D.ask(4, 0, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, 0, {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref3}, disp)
assert disp == {[{4, pid, ref1}, {3, pid, ref3}, {2, pid, ref2}], 0, 4, @default_shuffle_flag}

{:ok, 2, disp} = D.ask(2, {pid, ref3}, disp)
{:ok, 2, disp} = D.ask(2, 0, {pid, ref3}, disp)
assert disp == {[{5, pid, ref3}, {4, pid, ref1}, {2, pid, ref2}], 0, 4, @default_shuffle_flag}

{:ok, 4, disp} = D.ask(4, {pid, ref2}, disp)
{:ok, 4, disp} = D.ask(4, 0, {pid, ref2}, disp)
assert disp == {[{6, pid, ref2}, {5, pid, ref3}, {4, pid, ref1}], 0, 4, @default_shuffle_flag}
end

Expand All @@ -108,8 +144,8 @@ defmodule GenStage.DemandDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)

{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, 0, {pid, ref2}, disp)
assert disp == {[{3, pid, ref1}, {2, pid, ref2}], 0, 3, @default_shuffle_flag}

# One batch fits all
Expand All @@ -123,8 +159,8 @@ defmodule GenStage.DemandDispatcherTest do
refute_received {:"$gen_consumer", {_, _}, _}

# Two batches with left over
{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 3, disp} = D.ask(3, {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref1}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref2}, disp)
assert disp == {[{3, pid, ref1}, {3, pid, ref2}], 0, 3, @default_shuffle_flag}

{:ok, [], disp} = D.dispatch([:a, :b], 2, disp)
Expand All @@ -151,8 +187,8 @@ defmodule GenStage.DemandDispatcherTest do
{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)

{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, 0, {pid, ref2}, disp)
assert disp == {[{3, pid, ref1}, {2, pid, ref2}], 0, 3, true}

# demands should be shuffled after first dispatch
Expand Down Expand Up @@ -181,7 +217,7 @@ defmodule GenStage.DemandDispatcherTest do

{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 3, disp} = D.ask(3, 0, {pid, ref1}, disp)

{:ok, notify_disp} = D.info(:hello, disp)
assert disp == notify_disp
Expand All @@ -199,7 +235,7 @@ defmodule GenStage.DemandDispatcherTest do

log =
capture_log(fn ->
{:ok, 4, disp} = D.ask(4, {pid, ref2}, disp)
{:ok, 4, disp} = D.ask(4, 0, {pid, ref2}, disp)
disp
end)

Expand Down
Loading