Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept ranges of users in amoc_coordinator #174

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions guides/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ The coordinator reacts to new users showing up in a system, according to the *Co
The *Coordination Plan* consists of *Coordination Items*, and each of them is defined as one of the following: `{NumberOfUsers, CoordinationActions}`.
- When the `NumberOfUsers` is set to `all`, then only *Coordination Actions* with the arities `/1, /2` are handled.
The *Coordination Items* with `all` are triggered by the `timeout` event type.
- When the `NumberOfUsers` is set to a positive integer, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.
- When the `NumberOfUsers` is set to a positive integer or a range, all *Coordination Actions* with arities `/1, /2` and `/3` are handled.

Note that `NumberOfUsers` can be a range, in which case a new integer within the range will be randomly selected every time the coordinator fills a batch, to ensure a non-equal but uniform distribution of coordination.

The timeout timer is reset by calling the `add` function.
A new batch size is set in the `NumberOfUsers`. Each user in the batch calls the `add` function registering to the coordinator and triggering the *Coordination Plan*.
If more then one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
If more than one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users.
For example if the *Coordination Plan* is `[{2, Act1}, {3, Act2}]` then on the 6th user calling `add`, `Act1` will be called with 2 users passed and `Act2` will be called with 3 users passed.

*Coordination Actions* may be one of the following:
Expand Down
39 changes: 25 additions & 14 deletions src/coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@

-type name() :: atom().

-type coordination_data() :: {pid(), Data :: any()}.
-type data() :: {pid(), Data :: any()}.

-type maybe_coordination_data() :: coordination_data() | undefined.
-type maybe_coordination_data() :: data() | undefined.

-type event() :: coordinator_timeout | reset_coordinator | {coordinate, {pid(), term()}}.
-type coordination_event_type() :: coordinate | timeout | stop | reset.
-type event_type() :: coordinate | timeout | stop | reset.

-type coordination_event() :: {coordination_event_type(), non_neg_integer()}.
-type coordination_event() :: {event_type(), non_neg_integer()}.

-type coordination_action() ::
fun((coordination_event(), [coordination_data()]) -> any()) |
-type action() ::
fun((coordination_event(), [data()]) -> any()) |
fun((coordination_event(), maybe_coordination_data(), maybe_coordination_data()) -> any()) |
fun((coordination_event()) -> any()).

-type coordination_actions() :: [coordination_action()] | coordination_action().
-type coordination_actions() :: [action()] | action().

-type coordination_item() :: {NoOfUsers :: pos_integer() | all,
coordination_actions()}.
-type num_of_users() :: pos_integer() | {Min :: pos_integer(), Max :: pos_integer()} | all.

-type coordination_item() :: {num_of_users(), coordination_actions()}.

-type normalized_coordination_item() :: {NoOfUsers :: pos_integer() | all,
[coordination_action()]}.
[action()]}.

-type plan() :: [coordination_item()] | coordination_item().

Expand All @@ -51,10 +52,11 @@
-export_type([name/0,
event/0,
plan/0,
coordination_event_type/0,
event_type/0,
action/0,
data/0,
num_of_users/0,
coordination_event/0,
coordination_action/0,
coordination_data/0,
normalized_coordination_item/0]).

%%%===================================================================
Expand Down Expand Up @@ -162,7 +164,11 @@ normalize_coordination_item({NoOfUsers, Action}) when is_function(Action) ->
normalize_coordination_item({NoOfUsers, Actions}) when ?IS_N_OF_USERS(NoOfUsers),
is_list(Actions) ->
[assert_action(NoOfUsers, A) || A <- Actions],
{NoOfUsers, Actions}.
{NoOfUsers, Actions};
normalize_coordination_item({{Min, Max}, Actions}) when ?IS_POS_INT(Min), ?IS_POS_INT(Max),
Max > Min, is_list(Actions) ->
[assert_action({Min, Max}, A) || A <- Actions],
{{Min, Max}, Actions}.

assert_action(all, Action) when is_function(Action, 1);
is_function(Action, 2) ->
Expand All @@ -171,4 +177,9 @@ assert_action(N, Action) when is_integer(N),
(is_function(Action, 1) orelse
is_function(Action, 2) orelse
is_function(Action, 3)) ->
ok;
assert_action({Min, Max}, Action) when is_integer(Min), is_integer(Max),
(is_function(Action, 1) orelse
is_function(Action, 2) orelse
is_function(Action, 3)) ->
ok.
37 changes: 24 additions & 13 deletions src/coordinator/amoc_coordinator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
handle_call/3,
handle_cast/2]).

-type event_type() :: amoc_coordinator:coordination_event_type().
-type event_type() :: amoc_coordinator:event_type().
-type event() :: amoc_coordinator:coordination_event().
-type action() :: amoc_coordinator:coordination_action().
-type data() :: amoc_coordinator:coordination_data().
-type action() :: amoc_coordinator:action().
-type data() :: amoc_coordinator:data().

-record(state, {required_n = all :: pos_integer() | all,
-record(state, {configured = all :: amoc_coordinator:num_of_users(),
required_n = all :: pos_integer() | all,
n = 0 :: non_neg_integer(),
actions = [] :: [action()],
collect_data = true :: boolean(),
accumulator = [] :: [data()]}).
acc = [] :: [data()]}).

-type state() :: #state{}.

Expand Down Expand Up @@ -58,9 +59,11 @@ add(Pid, Data) ->

-spec init(amoc_coordinator:normalized_coordination_item()) -> {ok, state()}.
init({NoOfUsers, Actions}) ->
State = #state{required_n = NoOfUsers, actions = Actions},
{ok, State#state{collect_data = is_acc_required(Actions)}}.

State = #state{configured = NoOfUsers,
required_n = calculate_n(NoOfUsers),
actions = Actions,
collect_data = is_acc_required(Actions)},
{ok, State}.

-spec handle_call({reset, reset | timeout | stop}, term(), state()) ->
{reply, ok, state()} | {stop, normal, ok, state()}.
Expand All @@ -84,12 +87,12 @@ is_acc_required(Actions) ->
end, Actions).

-spec add_data(data(), state()) -> state().
add_data(Data, #state{n = N, accumulator = Acc} = State) ->
add_data(Data, #state{n = N, acc = Acc} = State) ->
NewState = case State#state.collect_data of
false ->
State#state{n = N + 1};
true ->
State#state{n = N + 1, accumulator = [Data | Acc]}
State#state{n = N + 1, acc = [Data | Acc]}
end,
maybe_reset_state(NewState).

Expand All @@ -100,13 +103,15 @@ maybe_reset_state(State) ->
State.

-spec reset_state(event_type(), state()) -> state().
reset_state(Event, #state{actions = Actions,
accumulator = Acc,
reset_state(Event, #state{configured = Config,
actions = Actions,
acc = Acc,
n = N, required_n = ReqN} = State) ->
amoc_telemetry:execute([coordinator, execute], #{count => N},
#{event => Event, configured => ReqN}),
[execute_action(Action, {Event, N}, Acc) || Action <- Actions],
State#state{accumulator = [], n = 0}.
NewN = calculate_n(Config),
State#state{required_n = NewN, n = 0, acc = []}.

-spec execute_action(action(), event(), [data()]) -> any().
execute_action(Action, Event, _) when is_function(Action, 1) ->
Expand All @@ -125,6 +130,12 @@ safe_executions(Fun, Args) ->
_:_ -> ok
end.

-spec calculate_n(amoc_coordinator:num_of_users()) -> all | pos_integer().
calculate_n({Min, Max}) ->
Min - 1 + rand:uniform(Max - Min);
calculate_n(Value) ->
Value.

-spec distinct_pairs(fun((data(), data()) -> any()), [data()]) -> any().
distinct_pairs(Fun, []) ->
Fun(undefined, undefined);
Expand Down
20 changes: 20 additions & 0 deletions test/amoc_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ all() ->
plan_normalises_successfully,
ordering_plan_sets_all_at_the_end,
failing_action_does_not_kill_the_worker,
execute_with_range_without_timeout,
execute_plan_without_timeout,
reset_plan_without_timeout,
execute_plan_with_timeout
Expand Down Expand Up @@ -45,6 +46,25 @@ init_per_testcase(_, Config) ->
end_per_testcase(_Config) ->
ok.

execute_with_range_without_timeout(_Config) ->
N = 20, Name = ?FUNCTION_NAME,

Plan = [ Range = {{5, 10}, mocked_action(range, 1)},
All = {all, mocked_action(all, 1)}],

?assertEqual(ok, amoc_coordinator:start(Name, Plan, infinity)),
[amoc_coordinator:add(Name, User) || User <- lists:seq(1, N)],

amoc_coordinator:stop(Name),
meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000),

History = meck:history(?MOCK_MOD),
NumOfEvents = length(History),
?assert(3 =< NumOfEvents andalso NumOfEvents =< 5),

nothing_after_tags(History, [all]),
assert_telemetry_events(Name, [start, {N, add}, stop]).

execute_plan_without_timeout(_Config) ->
N = 4, Name = ?FUNCTION_NAME,

Expand Down
Loading