diff --git a/guides/coordinator.md b/guides/coordinator.md index 2727d7c4..0c6dfacc 100644 --- a/guides/coordinator.md +++ b/guides/coordinator.md @@ -10,11 +10,13 @@ The coordinator reacts to new users showing up in a system, according to the *Co The *Coordination Plan* consists of *Coordination Items*, and each of them is defined as one of the following: `{NumberOfUsers, CoordinationActions}`. - When the `NumberOfUsers` is set to `all`, then only *Coordination Actions* with the arities `/1, /2` are handled. The *Coordination Items* with `all` are triggered by the `timeout` event type. - - When the `NumberOfUsers` is set to a positive integer, all *Coordination Actions* with arities `/1, /2` and `/3` are handled. + - When the `NumberOfUsers` is set to a positive integer or a range, all *Coordination Actions* with arities `/1, /2` and `/3` are handled. + +Note that `NumberOfUsers` can be a range, in which case a new integer within the range will be randomly selected every time the coordinator fills a batch, to ensure a non-equal but uniform distribution of coordination. The timeout timer is reset by calling the `add` function. A new batch size is set in the `NumberOfUsers`. Each user in the batch calls the `add` function registering to the coordinator and triggering the *Coordination Plan*. -If more then one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users. +If more than one of the *Coordination Items* matching the `NumberOfUsers` is triggered, each of them will be passed a respective number of users. For example if the *Coordination Plan* is `[{2, Act1}, {3, Act2}]` then on the 6th user calling `add`, `Act1` will be called with 2 users passed and `Act2` will be called with 3 users passed. *Coordination Actions* may be one of the following: diff --git a/src/coordinator/amoc_coordinator.erl b/src/coordinator/amoc_coordinator.erl index c9792c39..de56c8e9 100644 --- a/src/coordinator/amoc_coordinator.erl +++ b/src/coordinator/amoc_coordinator.erl @@ -21,27 +21,28 @@ -type name() :: atom(). --type coordination_data() :: {pid(), Data :: any()}. +-type data() :: {pid(), Data :: any()}. --type maybe_coordination_data() :: coordination_data() | undefined. +-type maybe_coordination_data() :: data() | undefined. -type event() :: coordinator_timeout | reset_coordinator | {coordinate, {pid(), term()}}. --type coordination_event_type() :: coordinate | timeout | stop | reset. +-type event_type() :: coordinate | timeout | stop | reset. --type coordination_event() :: {coordination_event_type(), non_neg_integer()}. +-type coordination_event() :: {event_type(), non_neg_integer()}. --type coordination_action() :: - fun((coordination_event(), [coordination_data()]) -> any()) | +-type action() :: + fun((coordination_event(), [data()]) -> any()) | fun((coordination_event(), maybe_coordination_data(), maybe_coordination_data()) -> any()) | fun((coordination_event()) -> any()). --type coordination_actions() :: [coordination_action()] | coordination_action(). +-type coordination_actions() :: [action()] | action(). --type coordination_item() :: {NoOfUsers :: pos_integer() | all, - coordination_actions()}. +-type num_of_users() :: pos_integer() | {Min :: pos_integer(), Max :: pos_integer()} | all. + +-type coordination_item() :: {num_of_users(), coordination_actions()}. -type normalized_coordination_item() :: {NoOfUsers :: pos_integer() | all, - [coordination_action()]}. + [action()]}. -type plan() :: [coordination_item()] | coordination_item(). @@ -51,10 +52,11 @@ -export_type([name/0, event/0, plan/0, - coordination_event_type/0, + event_type/0, + action/0, + data/0, + num_of_users/0, coordination_event/0, - coordination_action/0, - coordination_data/0, normalized_coordination_item/0]). %%%=================================================================== @@ -162,7 +164,11 @@ normalize_coordination_item({NoOfUsers, Action}) when is_function(Action) -> normalize_coordination_item({NoOfUsers, Actions}) when ?IS_N_OF_USERS(NoOfUsers), is_list(Actions) -> [assert_action(NoOfUsers, A) || A <- Actions], - {NoOfUsers, Actions}. + {NoOfUsers, Actions}; +normalize_coordination_item({{Min, Max}, Actions}) when ?IS_POS_INT(Min), ?IS_POS_INT(Max), + Max > Min, is_list(Actions) -> + [assert_action({Min, Max}, A) || A <- Actions], + {{Min, Max}, Actions}. assert_action(all, Action) when is_function(Action, 1); is_function(Action, 2) -> @@ -171,4 +177,9 @@ assert_action(N, Action) when is_integer(N), (is_function(Action, 1) orelse is_function(Action, 2) orelse is_function(Action, 3)) -> + ok; +assert_action({Min, Max}, Action) when is_integer(Min), is_integer(Max), + (is_function(Action, 1) orelse + is_function(Action, 2) orelse + is_function(Action, 3)) -> ok. diff --git a/src/coordinator/amoc_coordinator_worker.erl b/src/coordinator/amoc_coordinator_worker.erl index 3701211a..93b743bd 100644 --- a/src/coordinator/amoc_coordinator_worker.erl +++ b/src/coordinator/amoc_coordinator_worker.erl @@ -15,16 +15,17 @@ handle_call/3, handle_cast/2]). --type event_type() :: amoc_coordinator:coordination_event_type(). +-type event_type() :: amoc_coordinator:event_type(). -type event() :: amoc_coordinator:coordination_event(). --type action() :: amoc_coordinator:coordination_action(). --type data() :: amoc_coordinator:coordination_data(). +-type action() :: amoc_coordinator:action(). +-type data() :: amoc_coordinator:data(). --record(state, {required_n = all :: pos_integer() | all, +-record(state, {configured = all :: amoc_coordinator:num_of_users(), + required_n = all :: pos_integer() | all, n = 0 :: non_neg_integer(), actions = [] :: [action()], collect_data = true :: boolean(), - accumulator = [] :: [data()]}). + acc = [] :: [data()]}). -type state() :: #state{}. @@ -58,9 +59,11 @@ add(Pid, Data) -> -spec init(amoc_coordinator:normalized_coordination_item()) -> {ok, state()}. init({NoOfUsers, Actions}) -> - State = #state{required_n = NoOfUsers, actions = Actions}, - {ok, State#state{collect_data = is_acc_required(Actions)}}. - + State = #state{configured = NoOfUsers, + required_n = calculate_n(NoOfUsers), + actions = Actions, + collect_data = is_acc_required(Actions)}, + {ok, State}. -spec handle_call({reset, reset | timeout | stop}, term(), state()) -> {reply, ok, state()} | {stop, normal, ok, state()}. @@ -84,12 +87,12 @@ is_acc_required(Actions) -> end, Actions). -spec add_data(data(), state()) -> state(). -add_data(Data, #state{n = N, accumulator = Acc} = State) -> +add_data(Data, #state{n = N, acc = Acc} = State) -> NewState = case State#state.collect_data of false -> State#state{n = N + 1}; true -> - State#state{n = N + 1, accumulator = [Data | Acc]} + State#state{n = N + 1, acc = [Data | Acc]} end, maybe_reset_state(NewState). @@ -100,13 +103,15 @@ maybe_reset_state(State) -> State. -spec reset_state(event_type(), state()) -> state(). -reset_state(Event, #state{actions = Actions, - accumulator = Acc, +reset_state(Event, #state{configured = Config, + actions = Actions, + acc = Acc, n = N, required_n = ReqN} = State) -> amoc_telemetry:execute([coordinator, execute], #{count => N}, #{event => Event, configured => ReqN}), [execute_action(Action, {Event, N}, Acc) || Action <- Actions], - State#state{accumulator = [], n = 0}. + NewN = calculate_n(Config), + State#state{required_n = NewN, n = 0, acc = []}. -spec execute_action(action(), event(), [data()]) -> any(). execute_action(Action, Event, _) when is_function(Action, 1) -> @@ -125,6 +130,12 @@ safe_executions(Fun, Args) -> _:_ -> ok end. +-spec calculate_n(amoc_coordinator:num_of_users()) -> all | pos_integer(). +calculate_n({Min, Max}) -> + Min - 1 + rand:uniform(Max - Min); +calculate_n(Value) -> + Value. + -spec distinct_pairs(fun((data(), data()) -> any()), [data()]) -> any(). distinct_pairs(Fun, []) -> Fun(undefined, undefined); diff --git a/test/amoc_coordinator_SUITE.erl b/test/amoc_coordinator_SUITE.erl index 1f977a8a..cc7a4cf1 100644 --- a/test/amoc_coordinator_SUITE.erl +++ b/test/amoc_coordinator_SUITE.erl @@ -14,6 +14,7 @@ all() -> plan_normalises_successfully, ordering_plan_sets_all_at_the_end, failing_action_does_not_kill_the_worker, + execute_with_range_without_timeout, execute_plan_without_timeout, reset_plan_without_timeout, execute_plan_with_timeout @@ -45,6 +46,25 @@ init_per_testcase(_, Config) -> end_per_testcase(_Config) -> ok. +execute_with_range_without_timeout(_Config) -> + N = 20, Name = ?FUNCTION_NAME, + + Plan = [ Range = {{5, 10}, mocked_action(range, 1)}, + All = {all, mocked_action(all, 1)}], + + ?assertEqual(ok, amoc_coordinator:start(Name, Plan, infinity)), + [amoc_coordinator:add(Name, User) || User <- lists:seq(1, N)], + + amoc_coordinator:stop(Name), + meck:wait(length(Plan), ?MOCK_MOD, f_1, ['_', {stop, '_'}], 1000), + + History = meck:history(?MOCK_MOD), + NumOfEvents = length(History), + ?assert(3 =< NumOfEvents andalso NumOfEvents =< 5), + + nothing_after_tags(History, [all]), + assert_telemetry_events(Name, [start, {N, add}, stop]). + execute_plan_without_timeout(_Config) -> N = 4, Name = ?FUNCTION_NAME,