From 1fa358d8c93e41173e052dbff3daebde41a711e4 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Fri, 7 Jun 2024 15:46:17 -0700 Subject: [PATCH 01/15] Port in csrt changes --- src/chttpd/src/chttpd.erl | 5 + src/chttpd/src/chttpd_db.erl | 2 + src/chttpd/src/chttpd_httpd_handlers.erl | 1 + src/chttpd/src/chttpd_misc.erl | 105 ++++++++++++++++++ src/couch/include/couch_db.hrl | 2 + src/couch/priv/stats_descriptions.cfg | 33 ++++++ src/couch/src/couch_btree.erl | 3 + src/couch/src/couch_db.erl | 2 + src/couch/src/couch_query_servers.erl | 8 ++ src/couch/src/couch_server.erl | 2 + src/couch_stats/src/couch_stats.app.src | 8 +- src/couch_stats/src/couch_stats.erl | 30 +++++ src/couch_stats/src/couch_stats_sup.erl | 1 + src/fabric/priv/stats_descriptions.cfg | 50 +++++++++ src/fabric/src/fabric_rpc.erl | 15 +++ src/fabric/src/fabric_util.erl | 4 + .../test/eunit/fabric_rpc_purge_tests.erl | 2 + src/fabric/test/eunit/fabric_rpc_tests.erl | 11 +- src/mango/src/mango_cursor_view.erl | 2 + src/mango/src/mango_selector.erl | 1 + src/mem3/src/mem3_rpc.erl | 34 ++++-- src/rexi/include/rexi.hrl | 1 + src/rexi/src/rexi.erl | 18 ++- src/rexi/src/rexi_monitor.erl | 1 + src/rexi/src/rexi_server.erl | 26 ++++- src/rexi/src/rexi_utils.erl | 34 +++++- 26 files changed, 377 insertions(+), 24 deletions(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 0c138086214..9642b208957 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -323,6 +323,9 @@ handle_request_int(MochiReq) -> % Save client socket so that it can be monitored for disconnects chttpd_util:mochiweb_client_req_set(MochiReq), + %% This is probably better in before_request, but having Path is nice + couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path), + {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> @@ -372,6 +375,7 @@ after_request(HttpReq, HttpResp0) -> HttpResp2 = update_stats(HttpReq, HttpResp1), chttpd_stats:report(HttpReq, HttpResp2), maybe_log(HttpReq, HttpResp2), + couch_stats_resource_tracker:destroy_context(), HttpResp2. process_request(#httpd{mochi_req = MochiReq} = HttpReq) -> @@ -409,6 +413,7 @@ handle_req_after_auth(HandlerKey, HttpReq) -> HandlerKey, fun chttpd_db:handle_request/1 ), + couch_stats_resource_tracker:set_context_handler_fun(HandlerFun), AuthorizedReq = chttpd_auth:authorize( possibly_hack(HttpReq), fun chttpd_auth_request:authorize_request/1 diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 062e0bf2403..8a816abdef3 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -83,6 +83,7 @@ % Database request handlers handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) -> + couch_stats_resource_tracker:set_context_dbname(DbName), case {Method, RestParts} of {'PUT', []} -> create_db_req(Req, DbName); @@ -103,6 +104,7 @@ handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) do_db_req(Req, fun db_req/2); {_, [SecondPart | _]} -> Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2), + couch_stats_resource_tracker:set_context_handler_fun(Handler), do_db_req(Req, Handler) end. diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index 932b52e5f6e..e1b26022204 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1; url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1; url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1; url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1; +url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1; url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1; url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1; url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1; diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 4b7c73b3556..d35df68e5af 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -20,6 +20,7 @@ handle_replicate_req/1, handle_reload_query_servers_req/1, handle_task_status_req/1, + handle_resource_status_req/1, handle_up_req/1, handle_utils_dir_req/1, handle_utils_dir_req/2, @@ -224,6 +225,110 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) -> handle_task_status_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). +handle_resource_status_req(#httpd{method = 'POST'} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + chttpd:validate_ctype(Req, "application/json"), + {Props} = chttpd:json_body_obj(Req), + Action = proplists:get_value(<<"action">>, Props), + Key = proplists:get_value(<<"key">>, Props), + Val = proplists:get_value(<<"val">>, Props), + + CountBy = fun couch_stats_resource_tracker:count_by/1, + GroupBy = fun couch_stats_resource_tracker:group_by/2, + SortedBy1 = fun couch_stats_resource_tracker:sorted_by/1, + SortedBy2 = fun couch_stats_resource_tracker:sorted_by/2, + ConvertEle = fun(K) -> list_to_existing_atom(binary_to_list(K)) end, + ConvertList = fun(L) -> [ConvertEle(E) || E <- L] end, + ToJson = fun couch_stats_resource_tracker:term_to_flat_json/1, + JsonKeys = fun(PL) -> [[ToJson(K), V] || {K, V} <- PL] end, + + Fun = case {Action, Key, Val} of + {<<"count_by">>, Keys, undefined} when is_list(Keys) -> + Keys1 = [ConvertEle(K) || K <- Keys], + fun() -> CountBy(Keys1) end; + {<<"count_by">>, Key, undefined} -> + Key1 = ConvertEle(Key), + fun() -> CountBy(Key1) end; + {<<"group_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) -> + Keys1 = ConvertList(Keys), + Vals1 = ConvertList(Vals), + fun() -> GroupBy(Keys1, Vals1) end; + {<<"group_by">>, Key, Vals} when is_list(Vals) -> + Key1 = ConvertEle(Key), + Vals1 = ConvertList(Vals), + fun() -> GroupBy(Key1, Vals1) end; + {<<"group_by">>, Keys, Val} when is_list(Keys) -> + Keys1 = ConvertList(Keys), + Val1 = ConvertEle(Val), + fun() -> GroupBy(Keys1, Val1) end; + {<<"group_by">>, Key, Val} -> + Key1 = ConvertEle(Key), + Val1 = ConvertList(Val), + fun() -> GroupBy(Key1, Val1) end; + + {<<"sorted_by">>, Key, undefined} -> + Key1 = ConvertEle(Key), + fun() -> JsonKeys(SortedBy1(Key1)) end; + {<<"sorted_by">>, Keys, undefined} when is_list(Keys) -> + Keys1 = [ConvertEle(K) || K <- Keys], + fun() -> JsonKeys(SortedBy1(Keys1)) end; + {<<"sorted_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) -> + Keys1 = ConvertList(Keys), + Vals1 = ConvertList(Vals), + fun() -> JsonKeys(SortedBy2(Keys1, Vals1)) end; + {<<"sorted_by">>, Key, Vals} when is_list(Vals) -> + Key1 = ConvertEle(Key), + Vals1 = ConvertList(Vals), + fun() -> JsonKeys(SortedBy2(Key1, Vals1)) end; + {<<"sorted_by">>, Keys, Val} when is_list(Keys) -> + Keys1 = ConvertList(Keys), + Val1 = ConvertEle(Val), + fun() -> JsonKeys(SortedBy2(Keys1, Val1)) end; + {<<"sorted_by">>, Key, Val} -> + Key1 = ConvertEle(Key), + Val1 = ConvertList(Val), + fun() -> JsonKeys(SortedBy2(Key1, Val1)) end; + _ -> + throw({badrequest, invalid_resource_request}) + end, + + Fun1 = fun() -> + case Fun() of + Map when is_map(Map) -> + {maps:fold( + fun + (_K,0,A) -> A; %% TODO: Skip 0 value entries? + (K,V,A) -> [{ToJson(K), V} | A] + end, + [], Map)}; + List when is_list(List) -> + List + end + end, + + {Resp, _Bad} = rpc:multicall(erlang, apply, [ + fun() -> + {node(), Fun1()} + end, + [] + ]), + %%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]), + send_json(Req, {Resp}); +handle_resource_status_req(#httpd{method = 'GET'} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + {Resp, Bad} = rpc:multicall(erlang, apply, [ + fun() -> + {node(), couch_stats_resource_tracker:active()} + end, + [] + ]), + %% TODO: incorporate Bad responses + send_json(Req, {Resp}); +handle_resource_status_req(Req) -> + ok = chttpd:verify_is_server_admin(Req), + send_method_not_allowed(Req, "GET,HEAD,POST"). + + handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = PostBody} = Req) -> chttpd:validate_ctype(Req, "application/json"), %% see HACK in chttpd.erl about replication diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 9c1df21b690..ba6bd38ca80 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -53,6 +53,8 @@ -define(INTERACTIVE_EDIT, interactive_edit). -define(REPLICATED_CHANGES, replicated_changes). +-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, self(), element(2, process_info(self(), message_queue_len)), Msg])). + -type branch() :: {Key::term(), Value::term(), Tree::term()}. -type path() :: {Start::pos_integer(), branch()}. -type update_type() :: replicated_changes | interactive_edit. diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index 6a7120f87ef..33c5565219d 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -306,6 +306,10 @@ {type, counter}, {desc, <<"number of couch_server LRU operations skipped">>} ]}. +{[couchdb, couch_server, open], [ + {type, counter}, + {desc, <<"number of couch_server open operations invoked">>} +]}. {[couchdb, query_server, vdu_rejects], [ {type, counter}, {desc, <<"number of rejections by validate_doc_update function">>} @@ -418,10 +422,39 @@ {type, counter}, {desc, <<"number of other requests">>} ]}. +{[couchdb, query_server, js_filter], [ + {type, counter}, + {desc, <<"number of JS filter invocations">>} +]}. +{[couchdb, query_server, js_filtered_docs], [ + {type, counter}, + {desc, <<"number of docs filtered through JS invocations">>} +]}. +{[couchdb, query_server, js_filter_error], [ + {type, counter}, + {desc, <<"number of JS filter invocation errors">>} +]}. {[couchdb, legacy_checksums], [ {type, counter}, {desc, <<"number of legacy checksums found in couch_file instances">>} ]}. +{[couchdb, btree, folds], [ + {type, counter}, + {desc, <<"number of couch btree kv fold callback invocations">>} +]}. +{[couchdb, btree, kp_node], [ + {type, counter}, + {desc, <<"number of couch btree kp_nodes read">>} +]}. +{[couchdb, btree, kv_node], [ + {type, counter}, + {desc, <<"number of couch btree kv_nodes read">>} +]}. +%% CSRT (couch_stats_resource_tracker) stats +{[couchdb, csrt, delta_missing_t0], [ + {type, counter}, + {desc, <<"number of csrt contexts without a proper startime">>} +]}. {[pread, exceed_eof], [ {type, counter}, {desc, <<"number of the attempts to read beyond end of db file">>} diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index b974a22eeca..27b5bc18b2e 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) -> get_node(#btree{fd = Fd}, NodePos) -> {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, btree, NodeType]), {NodeType, NodeList}. write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> @@ -1163,6 +1165,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) - false -> {stop, {PrevKVs, Reds}, Acc}; true -> + couch_stats:increment_counter([couchdb, btree, folds]), AssembledKV = assemble(Bt, K, V), case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 2ef89ced3a6..c7afaa4b39f 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -297,6 +297,7 @@ open_doc(Db, IdOrDocInfo) -> open_doc(Db, IdOrDocInfo, []). open_doc(Db, Id, Options) -> + %% TODO: wire in csrt tracking increment_stat(Db, [couchdb, database_reads]), case open_doc_int(Db, Id, Options) of {ok, #doc{deleted = true} = Doc} -> @@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when -> case lists:member(sys_db, Options) of true -> + %% TODO: we shouldn't leak resource usage just because it's a sys_db ok; false -> couch_stats:increment_counter(Stat, Count) diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl index 6789bfaef05..b5ddee312ef 100644 --- a/src/couch/src/couch_query_servers.erl +++ b/src/couch/src/couch_query_servers.erl @@ -542,6 +542,8 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)} catch throw:{os_process_error, {exit_status, 1}} -> + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, query_server, js_filter_error]), %% batch used too much memory, retry sequentially. Fun = fun(JsonDoc) -> filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc]) @@ -550,6 +552,12 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> end. filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) -> + %% Count usage in _int version as this can be repeated for OS error + %% Pros & cons... might not have actually processed `length(JsonDocs)` docs + %% but it certainly undercounts if we count in `filter_docs/5` above + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, query_server, js_filter]), + couch_stats:increment_counter([couchdb, query_server, js_filtered_docs], length(JsonDocs)), [true, Passes] = ddoc_prompt( Db, DDoc, diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 8f36f1c7f95..72edd0a0028 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -108,6 +108,8 @@ sup_start_link(N) -> gen_server:start_link({local, couch_server(N)}, couch_server, [N], []). open(DbName, Options) -> + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, couch_server, open]), try validate_open_or_create(DbName, Options), open_int(DbName, Options) diff --git a/src/couch_stats/src/couch_stats.app.src b/src/couch_stats/src/couch_stats.app.src index a54fac7349f..de9f00e4e70 100644 --- a/src/couch_stats/src/couch_stats.app.src +++ b/src/couch_stats/src/couch_stats.app.src @@ -13,8 +13,12 @@ {application, couch_stats, [ {description, "Simple statistics collection"}, {vsn, git}, - {registered, [couch_stats_aggregator, couch_stats_process_tracker]}, - {applications, [kernel, stdlib]}, + {registered, [ + couch_stats_aggregator, + couch_stats_process_tracker, + couch_stats_resource_tracker + ]}, + {applications, [kernel, stdlib, couch_log]}, {mod, {couch_stats_app, []}}, {env, []} ]}. diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl index 29a4024491f..29190e6b003 100644 --- a/src/couch_stats/src/couch_stats.erl +++ b/src/couch_stats/src/couch_stats.erl @@ -24,6 +24,12 @@ update_gauge/2 ]). +%% couch_stats_resource_tracker API +-export([ + create_context/3, + maybe_track_rexi_init_p/1 +]). + -type response() :: ok | {error, unknown_metric} | {error, invalid_metric}. -type stat() :: {any(), [{atom(), any()}]}. @@ -49,6 +55,11 @@ increment_counter(Name) -> -spec increment_counter(any(), pos_integer()) -> response(). increment_counter(Name, Value) -> + %% Should maybe_track_local happen before or after notify? + %% If after, only currently tracked metrics declared in the app's + %% stats_description.cfg will be trackable locally. Pros/cons. + %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]), + ok = maybe_track_local_counter(Name, Value), case couch_stats_util:get_counter(Name, stats()) of {ok, Ctx} -> couch_stats_counter:increment(Ctx, Value); {error, Error} -> {error, Error} @@ -100,6 +111,25 @@ stats() -> now_sec() -> erlang:monotonic_time(second). +%% Only potentially track positive increments to counters +-spec maybe_track_local_counter(any(), any()) -> ok. +maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 -> + %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]), + couch_stats_resource_tracker:maybe_inc(Name, Val), + ok; +maybe_track_local_counter(_, _) -> + ok. + +create_context(From, MFA, Nonce) -> + couch_stats_resource_tracker:create_context(From, MFA, Nonce). + +maybe_track_rexi_init_p({M, F, _A}) -> + Metric = [M, F, spawned], + case couch_stats_resource_tracker:should_track(Metric) of + true -> increment_counter(Metric); + false -> ok + end. + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). diff --git a/src/couch_stats/src/couch_stats_sup.erl b/src/couch_stats/src/couch_stats_sup.erl index 325372c3e4b..4b4df17e26a 100644 --- a/src/couch_stats/src/couch_stats_sup.erl +++ b/src/couch_stats/src/couch_stats_sup.erl @@ -29,6 +29,7 @@ init([]) -> { {one_for_one, 5, 10}, [ ?CHILD(couch_stats_server, worker), + ?CHILD(couch_stats_resource_tracker, worker), ?CHILD(couch_stats_process_tracker, worker) ] }}. diff --git a/src/fabric/priv/stats_descriptions.cfg b/src/fabric/priv/stats_descriptions.cfg index d12aa0c8480..9ab054bf038 100644 --- a/src/fabric/priv/stats_descriptions.cfg +++ b/src/fabric/priv/stats_descriptions.cfg @@ -26,3 +26,53 @@ {type, counter}, {desc, <<"number of write quorum errors">>} ]}. + + +%% fabric_rpc worker stats +%% TODO: decide on which naming scheme: +%% {[fabric_rpc, get_all_security, spawned], [ +%% {[fabric_rpc, spawned, get_all_security], [ +{[fabric_rpc, get_all_security, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker get_all_security spawns">>} +]}. +{[fabric_rpc, open_doc, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker open_doc spawns">>} +]}. +{[fabric_rpc, all_docs, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker all_docs spawns">>} +]}. +{[fabric_rpc, update_docs, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker update_docs spawns">>} +]}. +{[fabric_rpc, map_view, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker map_view spawns">>} +]}. +{[fabric_rpc, reduce_view, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker reduce_view spawns">>} +]}. +{[fabric_rpc, open_shard, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker open_shard spawns">>} +]}. +{[fabric_rpc, changes, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes spawns">>} +]}. +{[fabric_rpc, changes, processed], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes row invocations">>} +]}. +{[fabric_rpc, changes, returned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes rows returned">>} +]}. +{[fabric_rpc, view, rows_read], [ + {type, counter}, + {desc, <<"number of fabric_rpc view_cb row invocations">>} +]}. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 9b00d95011c..53b34f41a39 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -493,6 +493,11 @@ view_cb({meta, Meta}, Acc) -> ok = rexi:stream2({meta, Meta}), {ok, Acc}; view_cb({row, Props}, #mrargs{extra = Options} = Acc) -> + %% TODO: distinguish between rows and docs + %% TODO: wire in csrt tracking + %% TODO: distinguish between all_docs vs view call + couch_stats:increment_counter([fabric_rpc, view, rows_read]), + %%couch_stats_resource_tracker:inc(rows_read), % Adding another row ViewRow = fabric_view_row:from_props(Props, Options), ok = rexi:stream2(ViewRow), @@ -529,6 +534,7 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) -> changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}}; changes_enumerator(DocInfo, Acc) -> + couch_stats:increment_counter([fabric_rpc, changes, processed]), #fabric_changes_acc{ db = Db, args = #changes_args{ @@ -569,6 +575,7 @@ changes_enumerator(DocInfo, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}. changes_row(Changes, Docs, DocInfo, Acc) -> + couch_stats:increment_counter([fabric_rpc, changes, returned]), #fabric_changes_acc{db = Db, pending = Pending, epochs = Epochs} = Acc, #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo, {change, [ @@ -667,6 +674,14 @@ clean_stack(S) -> ). set_io_priority(DbName, Options) -> + couch_stats_resource_tracker:set_context_dbname(DbName), + %% TODO: better approach here than using proplists? + case proplists:get_value(user_ctx, Options) of + undefined -> + ok; + #user_ctx{name = UserName} -> + couch_stats_resource_tracker:set_context_username(UserName) + end, case lists:keyfind(io_priority, 1, Options) of {io_priority, Pri} -> erlang:put(io_priority, Pri); diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 4acb65c739a..63c70e27068 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -139,6 +139,10 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) -> receive {Ref, {ok, Db}} -> {ok, Db}; + %% TODO: switch to using rexi_utils:extract_delta + {Ref, {ok, Db}, {delta, Delta}} -> + couch_stats_resource_tracker:accumulate_delta(Delta), + {ok, Db}; {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> throw(Error); {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> diff --git a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl index 07e6b1d4220..c7a36fbe342 100644 --- a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl +++ b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl @@ -263,6 +263,8 @@ rpc_update_doc(DbName, Doc, Opts) -> Reply = test_util:wait(fun() -> receive {Ref, Reply} -> + Reply; + {Ref, Reply, {delta, _}} -> Reply after 0 -> wait diff --git a/src/fabric/test/eunit/fabric_rpc_tests.erl b/src/fabric/test/eunit/fabric_rpc_tests.erl index 16bb66badac..c402affbab0 100644 --- a/src/fabric/test/eunit/fabric_rpc_tests.erl +++ b/src/fabric/test/eunit/fabric_rpc_tests.erl @@ -101,7 +101,16 @@ t_no_config_db_create_fails_for_shard_rpc(DbName) -> receive Resp0 -> Resp0 end, - ?assertMatch({Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, Resp). + case couch_stats_resource_tracker:is_enabled() of + true -> + ?assertMatch( %% allow for {Ref, {rexi_EXIT, error}, {delta, D}} + {Ref, {'rexi_EXIT', {{error, missing_target}, _}}, _}, + Resp); + false -> + ?assertMatch( + {Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, + Resp) + end. t_db_create_with_config(DbName) -> MDbName = mem3:dbname(DbName), diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index 0928ae19311..e11c6941632 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -245,9 +245,11 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu Result = case mango_idx:def(Idx) of all_docs -> + couch_stats:increment_counter([mango_cursor, view, all_docs]), CB = fun ?MODULE:handle_all_docs_message/2, fabric:all_docs(Db, DbOpts, CB, Cursor, Args); _ -> + couch_stats:increment_counter([mango_cursor, view, idx]), CB = fun ?MODULE:handle_message/2, % Normal view DDoc = ddocid(Idx), diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl index 42031b7569d..d8d2c913c98 100644 --- a/src/mango/src/mango_selector.erl +++ b/src/mango/src/mango_selector.erl @@ -50,6 +50,7 @@ normalize(Selector) -> % This assumes that the Selector has been normalized. % Returns true or false. match(Selector, D) -> + %% TODO: wire in csrt tracking couch_stats:increment_counter([mango, evaluate_selector]), match_int(Selector, D). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 70fc797dad6..81b4fec2703 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -378,20 +378,34 @@ rexi_call(Node, MFA, Timeout) -> Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), Ref = rexi:cast(Node, self(), MFA, [sync]), try - receive - {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error); - {rexi_DOWN, Mon, _, Reason} -> - erlang:error({rexi_DOWN, {Node, Reason}}) - after Timeout -> - erlang:error(timeout) - end + wait_message(Node, Ref, Mon, Timeout) after rexi_monitor:stop(Mon) end. +wait_message(Node, Ref, Mon, Timeout) -> + receive + Msg -> + process_raw_message(Msg, Node, Ref, Mon, Timeout) + after Timeout -> + erlang:error(timeout) + end. + +process_raw_message(Msg0, Node, Ref, Mon, Timeout) -> + {Msg, Delta} = rexi_utils:extract_delta(Msg0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Msg of + {Ref, {ok, Reply}} -> + Reply; + {Ref, Error} -> + erlang:error(Error); + {rexi_DOWN, Mon, _, Reason} -> + erlang:error({rexi_DOWN, {Node, Reason}}); + Other -> + ?LOG_UNEXPECTED_MSG(Other), + wait_message(Node, Ref, Mon, Timeout) + end. + get_or_create_db(DbName, Options) -> mem3_util:get_or_create_db_int(DbName, Options). diff --git a/src/rexi/include/rexi.hrl b/src/rexi/include/rexi.hrl index a2d86b2ab54..a962f306917 100644 --- a/src/rexi/include/rexi.hrl +++ b/src/rexi/include/rexi.hrl @@ -11,6 +11,7 @@ % the License. -record(error, { + delta, timestamp, reason, mfa, diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 02d3a9e5559..3f93758bc82 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -104,7 +104,7 @@ kill_all(NodeRefs) when is_list(NodeRefs) -> -spec reply(any()) -> any(). reply(Reply) -> {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, Reply}). + erlang:send(Caller, maybe_add_delta({Ref, Reply})). %% Private function used by stream2 to initialize the stream. Message is of the %% form {OriginalRef, {self(),reference()}, Reply}, which enables the @@ -188,7 +188,7 @@ stream2(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, self(), Msg}), + erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})), ok catch throw:timeout -> @@ -222,7 +222,11 @@ stream_ack(Client) -> %% ping() -> {Caller, _} = get(rexi_from), - erlang:send(Caller, {rexi, '$rexi_ping'}). + %% It is essential ping/0 includes deltas as otherwise long running + %% filtered queries will be silent on usage until they finally return + %% a row or no results. This delay is proportional to the database size, + %% so instead we make sure ping/0 keeps live stats flowing. + erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})). aggregate_server_queue_len() -> rexi_server_mon:aggregate_queue_len(rexi_server). @@ -282,3 +286,11 @@ drain_acks(Count) -> after 0 -> {ok, Count} end. + +maybe_add_delta(T) -> + case couch_stats_resource_tracker:is_enabled() of + false -> + T; + true -> + rexi_utils:add_delta(T, rexi_utils:get_delta()) + end. diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl index 7fe66db71d4..72f0985df80 100644 --- a/src/rexi/src/rexi_monitor.erl +++ b/src/rexi/src/rexi_monitor.erl @@ -35,6 +35,7 @@ start(Procs) -> %% messages from our mailbox. -spec stop(pid()) -> ok. stop(MonitoringPid) -> + unlink(MonitoringPid), MonitoringPid ! {self(), shutdown}, flush_down_messages(). diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index b2df65c7193..729979c53bf 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -102,12 +102,12 @@ handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) -> case find_worker(Ref, Workers) of #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} = Job -> case Error of - #error{reason = {_Class, Reason}, stack = Stack} -> - notify_caller({CPid, CRef}, {Reason, Stack}), + #error{reason = {_Class, Reason}, stack = Stack, delta = Delta} -> + notify_caller({CPid, CRef}, {Reason, Stack}, Delta), St1 = save_error(Error, St), {noreply, remove_job(Job, St1)}; _ -> - notify_caller({CPid, CRef}, Error), + notify_caller({CPid, CRef}, Error, undefined), {noreply, remove_job(Job, St)} end; false -> @@ -134,15 +134,20 @@ init_p(From, MFA) -> string() | undefined ) -> any(). init_p(From, {M, F, A}, Nonce) -> + MFA = {M, F, length(A)}, put(rexi_from, From), - put('$initial_call', {M, F, length(A)}), + put('$initial_call', MFA), put(nonce, Nonce), try + couch_stats_resource_tracker:create_context(From, MFA, Nonce), + couch_stats:maybe_track_rexi_init_p(MFA), apply(M, F, A) catch exit:normal -> + couch_stats_resource_tracker:destroy_context(), ok; Class:Reason:Stack0 -> + couch_stats_resource_tracker:destroy_context(), Stack = clean_stack(Stack0), {ClientPid, _ClientRef} = From, couch_log:error( @@ -158,12 +163,15 @@ init_p(From, {M, F, A}, Nonce) -> ] ), exit(#error{ + delta = couch_stats_resource_tracker:make_delta(), timestamp = os:timestamp(), reason = {Class, Reason}, mfa = {M, F, A}, nonce = Nonce, stack = Stack }) + after + couch_stats_resource_tracker:destroy_context() end. %% internal @@ -200,8 +208,14 @@ find_worker(Ref, Tab) -> [Worker] -> Worker end. -notify_caller({Caller, Ref}, Reason) -> - rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). +notify_caller({Caller, Ref}, Reason, Delta) -> + Msg = case couch_stats_resource_tracker:is_enabled() of + true -> + {Ref, {rexi_EXIT, Reason}, {delta, Delta}}; + false -> + {Ref, {rexi_EXIT, Reason}} + end, + rexi_utils:send(Caller, Msg). kill_worker(FromRef, #st{clients = Clients} = St) -> case find_worker(FromRef, Clients) of diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 146d0238ac1..512932d2730 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -13,6 +13,7 @@ -module(rexi_utils). -export([server_pid/1, send/2, recv/6]). +-export([add_delta/2, extract_delta/1, get_delta/0]). %% @doc Return a rexi_server id for the given node. server_id(Node) -> @@ -60,6 +61,16 @@ process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> receive + Msg -> + process_raw_message(Msg, RefList, Keypos, Fun, Acc0, TimeoutRef) + after PerMsgTO -> + {timeout, Acc0} + end. + +process_raw_message(Payload0, RefList, Keypos, Fun, Acc0, TimeoutRef) -> + {Payload, Delta} = extract_delta(Payload0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Payload of {timeout, TimeoutRef} -> {timeout, Acc0}; {rexi, Ref, Msg} -> @@ -95,6 +106,25 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> end; {rexi_DOWN, _, _, _} = Msg -> Fun(Msg, nil, Acc0) - after PerMsgTO -> - {timeout, Acc0} end. + +add_delta({A}, Delta) -> {A, Delta}; +add_delta({A, B}, Delta) -> {A, B, Delta}; +add_delta({A, B, C}, Delta) -> {A, B, C, Delta}; +add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta}; +add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta}; +add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta}; +add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta}; +add_delta(T, _Delta) -> T. + +extract_delta({A, {delta, Delta}}) -> {{A}, Delta}; +extract_delta({A, B, {delta, Delta}}) -> {{A, B}, Delta}; +extract_delta({A, B, C, {delta, Delta}}) -> {{A, B, C}, Delta}; +extract_delta({A, B, C, D, {delta, Delta}}) -> {{A, B, C, D}, Delta}; +extract_delta({A, B, C, D, E, {delta, Delta}}) -> {{A, B, C, D, E}, Delta}; +extract_delta({A, B, C, D, E, F, {delta, Delta}}) -> {{A, B, C, D, E, F}, Delta}; +extract_delta({A, B, C, D, E, F, G, {delta, Delta}}) -> {{A, B, C, D, E, F, G}, Delta}; +extract_delta(T) -> {T, undefined}. + +get_delta() -> + {delta, couch_stats_resource_tracker:make_delta()}. From e67690a6220bffd53174f8acdba28fdcde3203f9 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 29 Jul 2024 16:14:58 -0700 Subject: [PATCH 02/15] Rework CSRT post experimentation --- src/chttpd/src/chttpd.erl | 1 + .../src/couch_stats_resource_tracker.erl | 886 ++++++++++++++++++ src/rexi/src/rexi_server.erl | 2 +- 3 files changed, 888 insertions(+), 1 deletion(-) create mode 100644 src/couch_stats/src/couch_stats_resource_tracker.erl diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 9642b208957..713719b813a 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -418,6 +418,7 @@ handle_req_after_auth(HandlerKey, HttpReq) -> possibly_hack(HttpReq), fun chttpd_auth_request:authorize_request/1 ), + couch_stats_resource_tracker:set_context_username(AuthorizedReq), {AuthorizedReq, HandlerFun(AuthorizedReq)} catch ErrorType:Error:Stack -> diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl new file mode 100644 index 00000000000..2e3915371e4 --- /dev/null +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -0,0 +1,886 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_stats_resource_tracker). + +-behaviour(gen_server). + +-export([ + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). + +%% PidRef API +-export([ + get_pid_ref/0, + set_pid_ref/1, + create_pid_ref/0, + close_pid_ref/0, close_pid_ref/1 +]). + +%% Context API +-export([ + create_resource/1, + create_context/5, + create_coordinator_context/2, + create_worker_context/3, + destroy_context/0, destroy_context/1, + + get_resource/0, get_resource/1, get_resource_raw/1, + + set_context_dbname/1, set_context_dbname/2, + set_context_handler_fun/1, set_context_handler_fun/2, + set_context_username/1, set_context_username/2 +]). + +%% stats collection api +-export([ + is_enabled/0, + + inc/1, inc/2, + maybe_inc/2, + accumulate_delta/1, + make_delta/0, + + ioq_called/0, + + should_track/1 +]). + +%% aggregate query api +-export([ + active/0, + active_coordinators/0, + active_workers/0, + + count_by/1, + group_by/2, group_by/3, + sorted/1, + sorted_by/1, sorted_by/2, sorted_by/3, + + find_by_pid/1, + find_by_pidref/1, + find_workers_by_pidref/1 +]). + +%% Process lifetime reporting api +-export([ + log_process_lifetime_report/1, + is_logging_enabled/0, + logging_enabled/0, + should_log/1, should_log/2, + tracker/1 +]). + +-include_lib("couch/include/couch_db.hrl"). + +%% Module pdict markers +-define(DELTA_TA, csrt_delta_ta). +-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0 +-define(PID_REF, csrt_pid_ref). %% track local ID +-define(TRACKER_PID, csrt_tracker). %% tracker pid + +-define(MANGO_EVAL_MATCH, mango_eval_match). +-define(DB_OPEN_DOC, docs_read). +-define(DB_OPEN, db_open). +-define(COUCH_SERVER_OPEN, db_open). +-define(COUCH_BT_GET_KP_NODE, get_kp_node). +-define(COUCH_BT_GET_KV_NODE, get_kv_node). +-define(COUCH_JS_FILTER, js_filter). +-define(COUCH_JS_FILTER_ERROR, js_filter_error). +-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs). +-define(IOQ_CALLS, ioq_calls). +-define(ROWS_READ, rows_read). + +%% TODO: overlap between this and couch btree fold invocations +%% TODO: need some way to distinguish fols on views vs find vs all_docs +-define(FRPC_CHANGES_ROW, changes_processed). +-define(FRPC_CHANGES_RETURNED, changes_returned). + +-record(st, {}). + +-record(rctx, { + %% Metadata + started_at = tnow(), + updated_at = tnow(), + pid_ref, + mfa, + nonce, + from, + type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc + dbname, + username, + path, + + %% Stats counters + db_open = 0, + docs_read = 0, + rows_read = 0, + changes_processed = 0, + changes_returned = 0, + ioq_calls = 0, + io_bytes_read = 0, + io_bytes_written = 0, + js_evals = 0, + js_filter = 0, + js_filter_error = 0, + js_filtered_docs = 0, + mango_eval_match = 0, + %% TODO: switch record definitions to be macro based, eg: + %% ?COUCH_BT_GET_KP_NODE = 0, + get_kv_node = 0, + get_kp_node = 0 +}). + +%% +%% Public API +%% + +%% +%% PidRef operations +%% + +get_pid_ref() -> + get(?PID_REF). + +set_pid_ref(PidRef) -> + erlang:put(?PID_REF, PidRef), + PidRef. + +create_pid_ref() -> + case get_pid_ref() of + undefined -> + ok; + PidRef0 -> + %% TODO: what to do when it already exists? + throw({epidexist, PidRef0}), + close_pid_ref(PidRef0) + end, + PidRef = {self(), make_ref()}, + set_pid_ref(PidRef), + PidRef. + +close_pid_ref() -> + close_pid_ref(get_pid_ref()). + +%%close_pid_ref(undefined) -> +%% undefined; +close_pid_ref(_PidRef) -> + erase(?PID_REF). + +get_resource() -> + get_resource(get_pid_ref()). + +get_resource(undefined) -> + undefined; +get_resource(PidRef) -> + catch get_resource_raw(PidRef). + +get_resource_raw(undefined) -> + undefined; +get_resource_raw(PidRef) -> + case ets:lookup(?MODULE, PidRef) of + [#rctx{}=Rctx] -> + Rctx; + [] -> + undefined + end. + +%% monotonic time now in millisecionds +tnow() -> + erlang:monotonic_time(millisecond). + +is_enabled() -> + config:get_boolean(?MODULE_STRING, "enabled", true). + +%% +%% Aggregate query API +%% + +active() -> active_int(all). +active_coordinators() -> active_int(coordinators). +active_workers() -> active_int(workers). + + +active_int(coordinators) -> + select_by_type(coordinators); +active_int(workers) -> + select_by_type(workers); +active_int(all) -> + lists:map(fun to_json/1, ets:tab2list(?MODULE)). + + +select_by_type(coordinators) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(workers) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(all) -> + lists:map(fun to_json/1, ets:tab2list(?MODULE)). + +find_by_pid(Pid) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. + +find_by_pidref(PidRef) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref=PidRef, _ = '_'})]. + +find_workers_by_pidref(PidRef) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{from=PidRef, _ = '_'})]. + +field(#rctx{pid_ref=Val}, pid_ref) -> Val; +%% NOTE: Pros and cons to doing these convert functions here +%% Ideally, this would be done later so as to prefer the core data structures +%% as long as possible, but we currently need the output of this function to +%% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by +%% structure provided by the caller of *_by aggregator functions below. +%% For now, we just always return jiffy:encode'able data types. +field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val); +field(#rctx{nonce=Val}, nonce) -> Val; +field(#rctx{from=Val}, from) -> Val; +field(#rctx{type=Val}, type) -> convert_type(Val); +field(#rctx{dbname=Val}, dbname) -> Val; +field(#rctx{username=Val}, username) -> Val; +field(#rctx{path=Val}, path) -> Val; +field(#rctx{db_open=Val}, db_open) -> Val; +field(#rctx{docs_read=Val}, docs_read) -> Val; +field(#rctx{rows_read=Val}, rows_read) -> Val; +field(#rctx{changes_processed=Val}, changes_processed) -> Val; +field(#rctx{changes_returned=Val}, changes_returned) -> Val; +field(#rctx{ioq_calls=Val}, ioq_calls) -> Val; +field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val; +field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val; +field(#rctx{js_evals=Val}, js_evals) -> Val; +field(#rctx{js_filter=Val}, js_filter) -> Val; +field(#rctx{js_filter_error=Val}, js_filter_error) -> Val; +field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val; +field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val; +field(#rctx{get_kv_node=Val}, get_kv_node) -> Val; +field(#rctx{get_kp_node=Val}, get_kp_node) -> Val. + +curry_field(Field) -> + fun(Ele) -> field(Ele, Field) end. + +count_by(KeyFun) -> + group_by(KeyFun, fun(_) -> 1 end). + +group_by(KeyFun, ValFun) -> + group_by(KeyFun, ValFun, fun erlang:'+'/2). + +%% eg: group_by(mfa, docs_read). +%% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls). +%% eg: ^^ or: group_by([mfa, docs_read], ioq_calls). +%% eg: group_by([username, dbname, mfa], docs_read). +%% eg: group_by([username, dbname, mfa], ioq_calls). +%% eg: group_by([username, dbname, mfa], js_filters). +group_by(KeyL, ValFun, AggFun) when is_list(KeyL) -> + KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end, + group_by(KeyFun, ValFun, AggFun); +group_by(Key, ValFun, AggFun) when is_atom(Key) -> + group_by(curry_field(Key), ValFun, AggFun); +group_by(KeyFun, Val, AggFun) when is_atom(Val) -> + group_by(KeyFun, curry_field(Val), AggFun); +group_by(KeyFun, ValFun, AggFun) -> + FoldFun = fun(Ele, Acc) -> + Key = KeyFun(Ele), + Val = ValFun(Ele), + CurrVal = maps:get(Key, Acc, 0), + NewVal = AggFun(CurrVal, Val), + %% TODO: should we skip here? how to make this optional? + case NewVal > 0 of + true -> + maps:put(Key, NewVal, Acc); + false -> + Acc + end + end, + ets:foldl(FoldFun, #{}, ?MODULE). + +%% Sorts largest first +sorted(Map) when is_map(Map) -> + lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)). + +shortened(L) -> + lists:sublist(L, 10). + +%% eg: sorted_by([username, dbname, mfa], ioq_calls) +%% eg: sorted_by([dbname, mfa], doc_reads) +sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))). +sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))). +sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, AggFun))). + +%% +%% Conversion API for outputting JSON +%% + +convert_mfa(MFA) when is_list(MFA) -> + list_to_binary(MFA); +convert_mfa({M0, F0, A0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + A = integer_to_binary(A0), + <>; +convert_mfa(null) -> + null; +convert_mfa(undefined) -> + null. + +convert_type(Atom) when is_atom(Atom) -> + atom_to_binary(Atom); +convert_type({coordinator, Verb0, Atom0}) when is_atom(Atom0) -> + Verb = atom_to_binary(Verb0), + Atom = atom_to_binary(Atom0), + <<"coordinator:", Verb/binary, ":", Atom/binary>>; +convert_type({coordinator, Verb0, Path0}) -> + Verb = atom_to_binary(Verb0), + Path = list_to_binary(Path0), + <<"coordinator:", Verb/binary, ":", Path/binary>>; +convert_type({worker, M0, F0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + <<"worker:", M/binary, ":", F/binary>>; +convert_type(null) -> + null; +convert_type(undefined) -> + null. + +convert_pidref({Parent0, ParentRef0}) -> + Parent = convert_pid(Parent0), + ParentRef = convert_ref(ParentRef0), + <>; +convert_pidref(null) -> + null; +convert_pidref(undefined) -> + null. + +convert_pid(Pid) when is_pid(Pid) -> + ?l2b(pid_to_list(Pid)). + +convert_ref(Ref) when is_reference(Ref) -> + ?l2b(ref_to_list(Ref)). + +to_json(#rctx{}=Rctx) -> + #rctx{ + updated_at = TP, + started_at = TInit, + pid_ref = PidRef, + mfa = MFA, + nonce = Nonce, + from = From, + dbname = DbName, + username = UserName, + db_open = DbOpens, + docs_read = DocsRead, + rows_read = RowsRead, + js_filter = JSFilters, + js_filter_error = JSFilterErrors, + js_filtered_docs = JSFilteredDocss, + type = Type, + get_kp_node = KpNodes, + get_kv_node = KvNodes, + changes_returned = ChangesReturned, + ioq_calls = IoqCalls + } = Rctx, + + #{ + updated_at => TP, + started_at => TInit, + pid_ref => convert_pidref(PidRef), + mfa => convert_mfa(MFA), + nonce => Nonce, + from => convert_pidref(From), + dbname => DbName, + username => UserName, + db_open => DbOpens, + docs_read => DocsRead, + js_filter => JSFilters, + js_filter_error => JSFilterErrors, + js_filtered_docs => JSFilteredDocss, + rows_read => RowsRead, + type => convert_type(Type), + kp_nodes => KpNodes, + kv_nodes => KvNodes, + changes_returned => ChangesReturned, + ioq_calls => IoqCalls + }. + +%% +%% Context lifecycle API +%% + +create_resource(#rctx{} = Rctx) -> + catch ets:insert(?MODULE, Rctx). + +create_worker_context(From, {M,F,_A} = MFA, Nonce) -> + case is_enabled() of + true -> + create_context(MFA, {worker, M, F}, null, From, Nonce); + false -> + false + end. + +create_coordinator_context(#httpd{} = Req, Path0) -> + case is_enabled() of + true -> + #httpd{ + method = Verb, + nonce = Nonce + %%path_parts = Parts + } = Req, + %%Path = list_to_binary([$/ | io_lib:format("~p", [Parts])]), + Path = list_to_binary([$/ | Path0]), + Type = {coordinator, Verb, init}, + create_context(null, Type, Path, null, Nonce); + false -> + false + end. + +create_context(MFA, Type, Path, From, Nonce) -> + PidRef = create_pid_ref(), + Rctx = #rctx{ + from = From, + pid_ref = PidRef, + mfa = MFA, + nonce = Nonce, + path = Path, + type = Type + }, + erlang:put(?DELTA_TZ, Rctx), + create_resource(Rctx), + track(Rctx), + PidRef. + +set_context_dbname(DbName) -> + set_context_dbname(DbName, get_pid_ref()). + +set_context_dbname(_, undefined) -> + ok; +set_context_dbname(DbName, PidRef) -> + is_enabled() andalso update_element(PidRef, [{#rctx.dbname, DbName}]). + +set_context_handler_fun(Fun) when is_function(Fun) -> + set_context_handler_fun(Fun, get_pid_ref()). +set_context_handler_fun(_, undefined) -> + ok; +set_context_handler_fun(Fun, PidRef) when is_function(Fun) -> + case is_enabled() of + false -> + ok; + true -> + FunName = erlang:fun_to_list(Fun), + #rctx{type={coordinator, Verb, _}} = get_resource(), + Update = [{#rctx.type, {coordinator, Verb, FunName}}], + update_element(PidRef, Update) + end. + +set_context_username(null) -> + ok; +set_context_username(undefined) -> + ok; +set_context_username(User) -> + set_context_username(User, get_pid_ref()). + +set_context_username(null, _) -> + ok; +set_context_username(_, undefined) -> + ok; +set_context_username(#httpd{user_ctx = Ctx}, PidRef) -> + set_context_username(Ctx, PidRef); +set_context_username(#user_ctx{name = Name}, PidRef) -> + set_context_username(Name, PidRef); +set_context_username(UserName, PidRef) -> + io:format("SETTING USERNAME TO: ~p~n", [UserName]), + is_enabled() andalso update_element(PidRef, [{#rctx.username, UserName}]). + +destroy_context() -> + destroy_context(get_pid_ref()). + +destroy_context(undefined) -> + ok; +destroy_context({_, _} = PidRef) -> + stop_tracker(get_tracker()), + close_pid_ref(PidRef), + ok. + +%% Stat collection API + +inc(Key) -> + inc(Key, 1). + +%% TODO: inc(io_bytes_read, N) -> +%% TODO: inc(io_bytes_written, N) -> +%% TODO: inc(js_evals, N) -> +inc(?DB_OPEN, N) -> + update_counter(#rctx.?DB_OPEN, N); +inc(?ROWS_READ, N) -> + update_counter(#rctx.?ROWS_READ, N); +inc(?FRPC_CHANGES_RETURNED, N) -> + update_counter(#rctx.?FRPC_CHANGES_RETURNED, N); +inc(?IOQ_CALLS, N) -> + update_counter(#rctx.?IOQ_CALLS, N); +inc(?COUCH_JS_FILTER, N) -> + update_counter(#rctx.?COUCH_JS_FILTER, N); +inc(?COUCH_JS_FILTER_ERROR, N) -> + update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N); +inc(?COUCH_JS_FILTERED_DOCS, N) -> + update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N); +inc(?MANGO_EVAL_MATCH, N) -> + update_counter(#rctx.?MANGO_EVAL_MATCH, N); +inc(?DB_OPEN_DOC, N) -> + update_counter(#rctx.?DB_OPEN_DOC, N); +inc(?FRPC_CHANGES_ROW, N) -> + update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read +inc(?COUCH_BT_GET_KP_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); +inc(?COUCH_BT_GET_KV_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N); +inc(_, _) -> + %% inc needs to allow unknown types to pass for accumulate_update to handle + %% updates from nodes with newer data formats + 0. + +maybe_inc([mango, evaluate_selector], Val) -> + inc(?MANGO_EVAL_MATCH, Val); +maybe_inc([couchdb, database_reads], Val) -> + inc(?DB_OPEN_DOC, Val); +maybe_inc([fabric_rpc, changes, processed], Val) -> + inc(?FRPC_CHANGES_ROW, Val); +maybe_inc([fabric_rpc, changes, returned], Val) -> + inc(?FRPC_CHANGES_RETURNED, Val); +maybe_inc([fabric_rpc, view, rows_read], Val) -> + inc(?ROWS_READ, Val); +maybe_inc([couchdb, couch_server, open], Val) -> + inc(?DB_OPEN, Val); +maybe_inc([couchdb, btree, kp_node], Val) -> + inc(?COUCH_BT_GET_KP_NODE, Val); +maybe_inc([couchdb, btree, kv_node], Val) -> + inc(?COUCH_BT_GET_KV_NODE, Val); +maybe_inc([couchdb, query_server, js_filter_error], Val) -> + inc(?COUCH_JS_FILTER_ERROR, Val); +maybe_inc([couchdb, query_server, js_filter], Val) -> + inc(?COUCH_JS_FILTER, Val); +maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> + inc(?COUCH_JS_FILTERED_DOCS, Val); +maybe_inc(_Metric, _Val) -> + %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), + 0. + +%% TODO: update stats_descriptions.cfg for relevant apps +should_track([fabric_rpc, all_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, processed]) -> + is_enabled(); +should_track([fabric_rpc, changes, returned]) -> + is_enabled(); +should_track([fabric_rpc, map_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, reduce_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, get_all_security, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_doc, spawned]) -> + is_enabled(); +should_track([fabric_rpc, update_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_shard, spawned]) -> + is_enabled(); +should_track([mango_cursor, view, all_docs]) -> + is_enabled(); +should_track([mango_cursor, view, idx]) -> + is_enabled(); +should_track(_Metric) -> + %%io:format("SKIPPING METRIC: ~p~n", [Metric]), + false. + +ioq_called() -> + is_enabled() andalso inc(ioq_calls). + +accumulate_delta(Delta) when is_map(Delta) -> + %% TODO: switch to creating a batch of updates to invoke a single + %% update_counter rather than sequentially invoking it for each field + is_enabled() andalso maps:foreach(fun inc/2, Delta); +accumulate_delta(undefined) -> + ok. + +make_delta() -> + TA = case get(?DELTA_TA) of + undefined -> + %% Need to handle this better, can't just make a new T0 at T' as + %% the timestamps will be identical causing a divide by zero error. + %% + %% Realistically need to ensure that all invocations of database + %% operations sets T0 appropriately. Perhaps it's possible to do + %% this is the couch_db:open chain, and then similarly, in + %% couch_server, and uhhhh... couch_file, and... + %% + %% I think we need some type of approach for establishing a T0 that + %% doesn't result in outrageous deltas. For now zero out the + %% microseconds field, or subtract a second on the off chance that + %% microseconds is zero. I'm not uptodate on the latest Erlang time + %% libraries and don't remember how to easily get an + %% `os:timestamp()` out of now() - 100ms or some such. + %% + %% I think it's unavoidable that we'll have some codepaths that do + %% not properly instantiate the T0 at spawn resulting in needing to + %% do some time of "time warp" or ignoring the timing collection + %% entirely. Perhaps if we hoisted out the stats collection into + %% the primary flow of the database and funnel that through all the + %% function clauses we could then utilize Dialyzer to statically + %% analyze and assert all code paths that invoke database + %% operations have properly instantinated a T0 at the appropriate + %% start time such that we don't have to "fudge" deltas with a + %% missing start point, but we're a long ways from that happening + %% so I feel it necessary to address the NULL start time. + + %% Track how often we fail to initiate T0 correctly + %% Perhaps somewhat naughty we're incrementing stats from within + %% couch_stats itself? Might need to handle this differently + %% TODO: determine appropriate course of action here + %% io:format("~n**********MISSING STARTING DELTA************~n~n", []), + couch_stats:increment_counter( + [couchdb, csrt, delta_missing_t0]), + %%[couch_stats_resource_tracker, delta_missing_t0]), + + case erlang:get(?DELTA_TZ) of + undefined -> + TA0 = make_delta_base(), + %% TODO: handline missing deltas, otherwise divide by zero + set_delta_a(TA0), + TA0; + TA0 -> + TA0 + end; + #rctx{} = TA0 -> + TA0 + end, + TB = get_resource(), + Delta = make_delta(TA, TB), + set_delta_a(TB), + Delta. + +make_delta(#rctx{}=TA, #rctx{}=TB) -> + Delta = #{ + docs_read => TB#rctx.docs_read - TA#rctx.docs_read, + js_filter => TB#rctx.js_filter - TA#rctx.js_filter, + js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error, + js_filtered_docs => TB#rctx.js_filtered_docs - TA#rctx.js_filtered_docs, + rows_read => TB#rctx.rows_read - TA#rctx.rows_read, + changes_returned => TB#rctx.changes_returned - TA#rctx.changes_returned, + get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, + get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, + db_open => TB#rctx.db_open - TA#rctx.db_open, + ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, + dt => TB#rctx.updated_at - TA#rctx.updated_at + }, + %% TODO: reevaluate this decision + %% Only return non zero (and also positive) delta fields + maps:filter(fun(_K,V) -> V > 0 end, Delta); +make_delta(_, #rctx{}) -> + #{error => missing_beg_rctx}; +make_delta(#rctx{}, _) -> + #{error => missing_fin_rctx}. + +%% TODO: what to do when PidRef=undefined? +make_delta_base(PidRef) -> + %% TODO: extract user_ctx and db/shard from request + Now = tnow(), + #rctx{ + pid_ref = PidRef, + %% TODO: confirm this subtraction works + started_at = Now - 100, %% give us 100ms rewind time for missing T0 + updated_at = Now + }. + +make_delta_base() -> + make_delta_base(get_pid_ref()). + +set_delta_a(TA) -> + erlang:put(?DELTA_TA, TA). + +update_counter(Field, Count) -> + is_enabled() andalso update_counter(get_pid_ref(), Field, Count). + +update_counter(undefined, _Field, _Count) -> + ok; +update_counter({_Pid,_Ref}=PidRef, Field, Count) -> + %% TODO: mem3 crashes without catch, why do we lose the stats table? + is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). + +update_element(undefined, _Update) -> + ok; +update_element({_Pid,_Ref}=PidRef, Update) -> + %% TODO: should we take any action when the update fails? + is_enabled() andalso catch ets:update_element(?MODULE, PidRef, Update). + +%% Process lifetime logging api + +track(#rctx{pid_ref=PidRef}) -> + case get_tracker() of + undefined -> + Pid = spawn(?MODULE, tracker, [PidRef]), + put_tracker(Pid), + Pid; + Pid when is_pid(Pid) -> + Pid + end. + +tracker({Pid, _Ref}=PidRef) -> + MonRef = erlang:monitor(process, Pid), + receive + stop -> + %% TODO: do we need cleanup here? + log_process_lifetime_report(PidRef), + catch evict(PidRef), + demonitor(MonRef), + ok; + {'DOWN', MonRef, _Type, _0DPid, _Reason0} -> + %% TODO: should we pass reason to log_process_lifetime_report? + %% Reason = case Reason0 of + %% {shutdown, Shutdown0} -> + %% Shutdown = atom_to_binary(Shutdown0), + %% <<"shutdown: ", Shutdown/binary>>; + %% Reason0 -> + %% Reason0 + %% end, + log_process_lifetime_report(PidRef), + catch evict(PidRef) + end. + +log_process_lifetime_report(PidRef) -> + %% TODO: catch error out of here, report crashes on depth>1 json + %%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), + %% TODO: clean this up + case is_enabled() andalso is_logging_enabled() of + true -> + Rctx = get_resource_raw(PidRef), + case should_log(Rctx) of + true -> + couch_log:report("csrt-pid-usage-lifetime", to_json(Rctx)); + _ -> + ok + end; + false -> + ok + end. + +is_logging_enabled() -> + logging_enabled() =/= false. + +logging_enabled() -> + case conf_get("log_pid_usage_report", "coordinator") of + "coordinator" -> + coordinator; + "true" -> + true; + _ -> + false + end. + +should_log(undefined) -> + false; +should_log(#rctx{}=Rctx) -> + should_log(Rctx, logging_enabled()). + +should_log(undefined, _) -> + false; +should_log(#rctx{}, true) -> + true; +should_log(#rctx{}, false) -> + false; +should_log(#rctx{type = {coordinator, _, _}}, coordinator) -> + true; +should_log(#rctx{type = {worker, fabric_rpc, FName}}, _) -> + case conf_get("log_fabric_rpc") of + "true" -> + true; + undefined -> + false; + Name -> + Name =:= atom_to_list(FName) + end; +should_log(#rctx{}, _) -> + false. + +%% +%% gen_server callbacks +%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(?MODULE, [ + named_table, + public, + {decentralized_counters, true}, + {write_concurrency, true}, + {read_concurrency, true}, + {keypos, #rctx.pid_ref} + ]), + {ok, #st{}}. + +handle_call(fetch, _from, #st{} = St) -> + {reply, {ok, St}, St}; +handle_call({call_search, _}, _From, St) -> + %% TODO: provide isolated search queries here + {reply, ok, St}; +handle_call(Msg, _From, St) -> + {stop, {unknown_call, Msg}, St}. + +handle_cast(Msg, St) -> + {stop, {unknown_cast, Msg}, St}. + +handle_info(Msg, St) -> + {stop, {unknown_info, Msg}, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +%% +%% private functions +%% + +conf_get(Key) -> + conf_get(Key, undefined). + + +conf_get(Key, Default) -> + config:get(?MODULE_STRING, Key, Default). + +%% +%% Process lifetime logging api +%% + +get_tracker() -> + get(?TRACKER_PID). + +put_tracker(Pid) when is_pid(Pid) -> + put(?TRACKER_PID, Pid). + +evict(PidRef) -> + ets:delete(?MODULE, PidRef). + +stop_tracker(undefined) -> + ok; +stop_tracker(Pid) when is_pid(Pid) -> + Pid ! stop. + diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 729979c53bf..0e1597fb447 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -139,7 +139,7 @@ init_p(From, {M, F, A}, Nonce) -> put('$initial_call', MFA), put(nonce, Nonce), try - couch_stats_resource_tracker:create_context(From, MFA, Nonce), + couch_stats_resource_tracker:create_worker_context(From, MFA, Nonce), couch_stats:maybe_track_rexi_init_p(MFA), apply(M, F, A) catch From e10d82ed87ea029d428b87575bc76c7ebc179dec Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 5 Aug 2024 11:34:16 -0700 Subject: [PATCH 03/15] Remove debug statement --- src/couch_stats/src/couch_stats_resource_tracker.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index 2e3915371e4..c3817cb9028 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -503,7 +503,6 @@ set_context_username(#httpd{user_ctx = Ctx}, PidRef) -> set_context_username(#user_ctx{name = Name}, PidRef) -> set_context_username(Name, PidRef); set_context_username(UserName, PidRef) -> - io:format("SETTING USERNAME TO: ~p~n", [UserName]), is_enabled() andalso update_element(PidRef, [{#rctx.username, UserName}]). destroy_context() -> From b3a38c2ea8873596e3f051025d53a8150a6edf56 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 5 Aug 2024 11:36:23 -0700 Subject: [PATCH 04/15] Inc IOQ calls --- src/ioq/src/ioq.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ioq/src/ioq.erl b/src/ioq/src/ioq.erl index bcb99b2919b..8a7e6894184 100644 --- a/src/ioq/src/ioq.erl +++ b/src/ioq/src/ioq.erl @@ -59,6 +59,7 @@ call_search(Fd, Msg, Metadata) -> call(Fd, Msg, Metadata). call(Fd, Msg, Metadata) -> + couch_stats_resource_tracker:ioq_called(), Priority = io_class(Msg, Metadata), case bypass(Priority) of true -> From a4fc93175a879c37502a7d30846dd0bb26bed450 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 5 Aug 2024 13:36:06 -0700 Subject: [PATCH 05/15] Add csrt delta to rexi_tests --- src/rexi/test/rexi_tests.erl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/rexi/test/rexi_tests.erl b/src/rexi/test/rexi_tests.erl index 18b05b545ca..7500e57e047 100644 --- a/src/rexi/test/rexi_tests.erl +++ b/src/rexi/test/rexi_tests.erl @@ -75,6 +75,7 @@ t_cast(_) -> Ref = rexi:cast(node(), {?MODULE, rpc_test_fun, [potato]}), {Res, Dict} = receive + {Ref, {R, D}, {delta, _}} -> {R, maps:from_list(D)}; {Ref, {R, D}} -> {R, maps:from_list(D)} end, ?assertEqual(potato, Res), @@ -99,7 +100,12 @@ t_cast_explicit_caller(_) -> receive {'DOWN', CallerRef, _, _, Exit} -> Exit end, - ?assertMatch({Ref, {potato, [_ | _]}}, Result). + case couch_stats_resource_tracker:is_enabled() of + true -> + ?assertMatch({Ref, {potato, [_ | _]}, {delta, _}}, Result); + false -> + ?assertMatch({Ref, {potato, [_ | _]}}, Result) + end. t_cast_ref(_) -> put(nonce, yesh), @@ -180,6 +186,7 @@ t_cast_error(_) -> Ref = rexi:cast(node(), self(), {?MODULE, rpc_test_fun, [{error, tomato}]}, []), Res = receive + {Ref, RexiExit, {delta, _}} -> RexiExit; {Ref, RexiExit} -> RexiExit end, ?assertMatch({rexi_EXIT, {tomato, [{?MODULE, rpc_test_fun, 1, _} | _]}}, Res). @@ -188,6 +195,7 @@ t_kill(_) -> Ref = rexi:cast(node(), {?MODULE, rpc_test_fun, [{sleep, 10000}]}), WorkerPid = receive + {Ref, {sleeping, Pid}, {delta, _}} -> Pid; {Ref, {sleeping, Pid}} -> Pid end, ?assert(is_process_alive(WorkerPid)), @@ -207,18 +215,23 @@ t_ping(_) -> rexi:cast(node(), {?MODULE, rpc_test_fun, [ping]}), Res = receive + {rexi, Ping, {delta, _}} -> Ping; {rexi, Ping} -> Ping end, ?assertEqual('$rexi_ping', Res). stream_init(Ref) -> receive + {Ref, From, rexi_STREAM_INIT, {delta, _}} -> + From; {Ref, From, rexi_STREAM_INIT} -> From end. recv(Ref) when is_reference(Ref) -> receive + {Ref, _, Msg, {delta, _}} -> Msg; + {Ref, Msg, {delta, _}} -> Msg; {Ref, _, Msg} -> Msg; {Ref, Msg} -> Msg after 500 -> timeout From d2c7eba12c11302ebbfb9a2793f8df24cb322810 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 19 Aug 2024 17:47:59 -0700 Subject: [PATCH 06/15] Use extract_delta for delta message handling --- src/fabric/src/fabric_util.erl | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 63c70e27068..19804e949eb 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -137,19 +137,20 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) -> Ref = rexi:cast(Node, self(), MFA, [sync]), try receive - {Ref, {ok, Db}} -> - {ok, Db}; - %% TODO: switch to using rexi_utils:extract_delta - {Ref, {ok, Db}, {delta, Delta}} -> + Msg0 -> + {Msg, Delta} = rexi_utils:extract_delta(Msg0), couch_stats_resource_tracker:accumulate_delta(Delta), - {ok, Db}; - {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> - throw(Error); - {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> - throw(Error); - {Ref, Reason} -> - couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]), - get_shard(Rest, Opts, Timeout, Factor) + case Msg of + {Ref, {ok, Db}} -> + {ok, Db}; + {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> + throw(Error); + {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> + throw(Error); + {Ref, Reason} -> + couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]), + get_shard(Rest, Opts, Timeout, Factor) + end after Timeout -> couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]), get_shard(Rest, Opts, Factor * Timeout, Factor) From 7ad90ad729bca138f31be5cfbd20c7264cc5dd7c Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 19 Aug 2024 17:48:48 -0700 Subject: [PATCH 07/15] Cleanup maybe_add_delta logic --- src/rexi/src/rexi.erl | 14 +++----------- src/rexi/src/rexi_server.erl | 7 +------ src/rexi/src/rexi_utils.erl | 25 +++++++++++++++++++++++++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 3f93758bc82..6fce2e5b047 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -104,7 +104,7 @@ kill_all(NodeRefs) when is_list(NodeRefs) -> -spec reply(any()) -> any(). reply(Reply) -> {Caller, Ref} = get(rexi_from), - erlang:send(Caller, maybe_add_delta({Ref, Reply})). + erlang:send(Caller, rexi_utils:maybe_add_delta({Ref, Reply})). %% Private function used by stream2 to initialize the stream. Message is of the %% form {OriginalRef, {self(),reference()}, Reply}, which enables the @@ -188,7 +188,7 @@ stream2(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})), + erlang:send(Caller, rexi_utils:maybe_add_delta({Ref, self(), Msg})), ok catch throw:timeout -> @@ -226,7 +226,7 @@ ping() -> %% filtered queries will be silent on usage until they finally return %% a row or no results. This delay is proportional to the database size, %% so instead we make sure ping/0 keeps live stats flowing. - erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})). + erlang:send(Caller, rexi_utils:maybe_add_delta({rexi, '$rexi_ping'})). aggregate_server_queue_len() -> rexi_server_mon:aggregate_queue_len(rexi_server). @@ -286,11 +286,3 @@ drain_acks(Count) -> after 0 -> {ok, Count} end. - -maybe_add_delta(T) -> - case couch_stats_resource_tracker:is_enabled() of - false -> - T; - true -> - rexi_utils:add_delta(T, rexi_utils:get_delta()) - end. diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 0e1597fb447..d74330de6f7 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -209,12 +209,7 @@ find_worker(Ref, Tab) -> end. notify_caller({Caller, Ref}, Reason, Delta) -> - Msg = case couch_stats_resource_tracker:is_enabled() of - true -> - {Ref, {rexi_EXIT, Reason}, {delta, Delta}}; - false -> - {Ref, {rexi_EXIT, Reason}} - end, + Msg = rexi_utils:maybe_add_delta({Ref, {rexi_EXIT, Reason}}, Delta), rexi_utils:send(Caller, Msg). kill_worker(FromRef, #st{clients = Clients} = St) -> diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 512932d2730..8b9855169c0 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -14,6 +14,7 @@ -export([server_pid/1, send/2, recv/6]). -export([add_delta/2, extract_delta/1, get_delta/0]). +-export([maybe_add_delta/1, maybe_add_delta/2]). %% @doc Return a rexi_server id for the given node. server_id(Node) -> @@ -128,3 +129,27 @@ extract_delta(T) -> {T, undefined}. get_delta() -> {delta, couch_stats_resource_tracker:make_delta()}. + +maybe_add_delta(T) -> + case couch_stats_resource_tracker:is_enabled() of + false -> + T; + true -> + %% Call add_elta/2 directly instead of maybe_add_delta/2 to avoid + %% redundant is_enabled check, or the pre-emptive get_delta/0 + add_delta(T, rexi_utils:get_delta()) + end. + +%% Allow for externally provided Delta in error handling scenarios +%% eg in cases like rexi_server:notify_caller +maybe_add_delta(T, undefined) -> + T; +maybe_add_delta(T, Delta) when is_map(Delta) -> + maybe_add_delta(T, {delta, Delta}); +maybe_add_delta(T, {delta, _} = Delta) -> + case couch_stats_resource_tracker:is_enabled() of + false -> + T; + true -> + add_delta(T, Delta) + end. From 03fa0cbb106e1985d2bc43fc6f292f5d800dca96 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Wed, 21 Aug 2024 14:10:36 -0700 Subject: [PATCH 08/15] Rework btree stats allowing for write_node stats --- src/couch/priv/stats_descriptions.cfg | 12 +++++-- src/couch/src/couch_btree.erl | 4 +-- .../src/couch_stats_resource_tracker.erl | 35 +++++++++++++------ 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index 33c5565219d..9d5bd113392 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -442,14 +442,22 @@ {type, counter}, {desc, <<"number of couch btree kv fold callback invocations">>} ]}. -{[couchdb, btree, kp_node], [ +{[couchdb, btree, get_node, kp_node], [ {type, counter}, {desc, <<"number of couch btree kp_nodes read">>} ]}. -{[couchdb, btree, kv_node], [ +{[couchdb, btree, get_node, kv_node], [ {type, counter}, {desc, <<"number of couch btree kv_nodes read">>} ]}. +{[couchdb, btree, write_node, kp_node], [ + {type, counter}, + {desc, <<"number of couch btree kp_nodes written">>} +]}. +{[couchdb, btree, write_node, kv_node], [ + {type, counter}, + {desc, <<"number of couch btree kv_nodes written">>} +]}. %% CSRT (couch_stats_resource_tracker) stats {[couchdb, csrt, delta_missing_t0], [ {type, counter}, diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index 27b5bc18b2e..ba176cca2ca 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -473,7 +473,7 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) -> get_node(#btree{fd = Fd}, NodePos) -> {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), %% TODO: wire in csrt tracking - couch_stats:increment_counter([couchdb, btree, NodeType]), + couch_stats:increment_counter([couchdb, btree, get_node, NodeType]), {NodeType, NodeList}. write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> @@ -482,6 +482,7 @@ write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> % now write out each chunk and return the KeyPointer pairs for those nodes ToWrite = [{NodeType, Chunk} || Chunk <- Chunks], WriteOpts = [{compression, Comp}], + couch_stats:increment_counter([couchdb, btree, write_node, NodeType]), {ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts), {ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}. @@ -1165,7 +1166,6 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) - false -> {stop, {PrevKVs, Reds}, Acc}; true -> - couch_stats:increment_counter([couchdb, btree, folds]), AssembledKV = assemble(Bt, K, V), case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index c3817cb9028..3e75be427f3 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -100,6 +100,8 @@ -define(COUCH_SERVER_OPEN, db_open). -define(COUCH_BT_GET_KP_NODE, get_kp_node). -define(COUCH_BT_GET_KV_NODE, get_kv_node). +-define(COUCH_BT_WRITE_KP_NODE, write_kp_node). +-define(COUCH_BT_WRITE_KV_NODE, write_kv_node). -define(COUCH_JS_FILTER, js_filter). -define(COUCH_JS_FILTER_ERROR, js_filter_error). -define(COUCH_JS_FILTERED_DOCS, js_filtered_docs). @@ -143,7 +145,9 @@ %% TODO: switch record definitions to be macro based, eg: %% ?COUCH_BT_GET_KP_NODE = 0, get_kv_node = 0, - get_kp_node = 0 + get_kp_node = 0, + write_kv_node = 0, + write_kp_node = 0 }). %% @@ -390,8 +394,10 @@ to_json(#rctx{}=Rctx) -> js_filter_error = JSFilterErrors, js_filtered_docs = JSFilteredDocss, type = Type, - get_kp_node = KpNodes, - get_kv_node = KvNodes, + get_kp_node = GetKpNodes, + get_kv_node = GetKvNodes, + %%write_kp_node = WriteKpNodes, + %%write_kv_node = WriteKvNodes, changes_returned = ChangesReturned, ioq_calls = IoqCalls } = Rctx, @@ -412,8 +418,10 @@ to_json(#rctx{}=Rctx) -> js_filtered_docs => JSFilteredDocss, rows_read => RowsRead, type => convert_type(Type), - kp_nodes => KpNodes, - kv_nodes => KvNodes, + get_kp_nodes => GetKpNodes, + get_kv_nodes => GetKvNodes, + %%write_kp_nodes => WriteKpNodes, + %%write_kv_nodes => WriteKvNodes, changes_returned => ChangesReturned, ioq_calls => IoqCalls }. @@ -564,10 +572,17 @@ maybe_inc([fabric_rpc, view, rows_read], Val) -> inc(?ROWS_READ, Val); maybe_inc([couchdb, couch_server, open], Val) -> inc(?DB_OPEN, Val); -maybe_inc([couchdb, btree, kp_node], Val) -> +maybe_inc([couchdb, btree, get_node, kp_node], Val) -> inc(?COUCH_BT_GET_KP_NODE, Val); -maybe_inc([couchdb, btree, kv_node], Val) -> +maybe_inc([couchdb, btree, get_node, kv_node], Val) -> inc(?COUCH_BT_GET_KV_NODE, Val); +%% The write_node logic won't pickup writes as none of the RPC +%% processes actually perform the write operation +%% TODO: bubble up induced work from other processes +maybe_inc([couchdb, btree, write_node, kp_node], Val) -> + inc(?COUCH_BT_WRITE_KP_NODE, Val); +maybe_inc([couchdb, btree, write_node, kv_node], Val) -> + inc(?COUCH_BT_WRITE_KV_NODE, Val); maybe_inc([couchdb, query_server, js_filter_error], Val) -> inc(?COUCH_JS_FILTER_ERROR, Val); maybe_inc([couchdb, query_server, js_filter], Val) -> @@ -762,9 +777,6 @@ tracker({Pid, _Ref}=PidRef) -> end. log_process_lifetime_report(PidRef) -> - %% TODO: catch error out of here, report crashes on depth>1 json - %%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), - %% TODO: clean this up case is_enabled() andalso is_logging_enabled() of true -> Rctx = get_resource_raw(PidRef), @@ -782,7 +794,8 @@ is_logging_enabled() -> logging_enabled() =/= false. logging_enabled() -> - case conf_get("log_pid_usage_report", "coordinator") of + %%case conf_get("log_pid_usage_report", "coordinator") of + case conf_get("log_pid_usage_report", "true") of "coordinator" -> coordinator; "true" -> From bb00b81cab246b2c9c5739b7c2328ea8871e55d2 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Wed, 21 Aug 2024 16:19:40 -0700 Subject: [PATCH 09/15] Handle unexpected get_shard messages --- src/fabric/src/fabric_util.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 19804e949eb..04b641a113f 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -149,6 +149,9 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) -> throw(Error); {Ref, Reason} -> couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]), + get_shard(Rest, Opts, Timeout, Factor); + _ -> + ?LOG_UNEXPECTED_MSG(Msg), get_shard(Rest, Opts, Timeout, Factor) end after Timeout -> From edd5d0ee9b279697304f6d781f8fd7280c47695f Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Fri, 23 Aug 2024 15:33:26 -0700 Subject: [PATCH 10/15] Restore coordinator logging only default --- src/couch_stats/src/couch_stats_resource_tracker.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index 3e75be427f3..cb7a825beb0 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -794,8 +794,7 @@ is_logging_enabled() -> logging_enabled() =/= false. logging_enabled() -> - %%case conf_get("log_pid_usage_report", "coordinator") of - case conf_get("log_pid_usage_report", "true") of + case conf_get("log_pid_usage_report", "coordinator") of "coordinator" -> coordinator; "true" -> From f06d4a4bc8cdc54013e7bfe4996a21cb62076b65 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Fri, 23 Aug 2024 15:58:16 -0700 Subject: [PATCH 11/15] Handle old fabric_util:get_shard Ref responses --- src/fabric/src/fabric_util.erl | 48 ++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 04b641a113f..52032f0ecc3 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -136,32 +136,36 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) -> MFA = {fabric_rpc, open_shard, [Name, [{timeout, Timeout} | Opts]]}, Ref = rexi:cast(Node, self(), MFA, [sync]), try - receive - Msg0 -> - {Msg, Delta} = rexi_utils:extract_delta(Msg0), - couch_stats_resource_tracker:accumulate_delta(Delta), - case Msg of - {Ref, {ok, Db}} -> - {ok, Db}; - {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> - throw(Error); - {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> - throw(Error); - {Ref, Reason} -> - couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]), - get_shard(Rest, Opts, Timeout, Factor); - _ -> - ?LOG_UNEXPECTED_MSG(Msg), - get_shard(Rest, Opts, Timeout, Factor) - end - after Timeout -> - couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]), - get_shard(Rest, Opts, Factor * Timeout, Factor) - end + await_shard_response(Ref, Name, Rest, Opts, Factor, Timeout) after rexi_monitor:stop(Mon) end. +await_shard_response(Ref, Name, Rest, Opts, Factor, Timeout) -> + receive + Msg0 -> + {Msg, Delta} = rexi_utils:extract_delta(Msg0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Msg of + {Ref, {ok, Db}} -> + {ok, Db}; + {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> + throw(Error); + {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> + throw(Error); + {Ref, Reason} -> + couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]), + get_shard(Rest, Opts, Timeout, Factor); + %% {OldRef, {ok, Db}} -> <-- stale db resp that got here late, should we do something? + _ -> + %% Got a message from an old Ref that timed out, try again + await_shard_response(Ref, Name, Rest, Opts, Factor, Timeout) + end + after Timeout -> + couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]), + get_shard(Rest, Opts, Factor * Timeout, Factor) + end. + get_db_timeout(N, Factor, MinTimeout, infinity) -> % MaxTimeout may be infinity so we just use the largest Erlang small int to % avoid blowing up the arithmetic From c02d57b6616b9906df776b520fe0a2b2b033bade Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Tue, 27 Aug 2024 16:31:33 -0700 Subject: [PATCH 12/15] Cleanup _Persist usage --- src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl b/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl index 01ef16f23e8..da60e85e604 100644 --- a/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl +++ b/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl @@ -24,8 +24,9 @@ setup() -> Hashed = couch_passwords:hash_admin_password(?PASS), - ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist = false), - ok = config:set("couchdb", "max_document_size", "50"), + ok = config:set("admins", ?USER, ?b2l(Hashed), false), + ok = config:set("couchdb", "max_document_size", "50", false), + TmpDb = ?tempdb(), Addr = config:get("chttpd", "bind_address", "127.0.0.1"), Port = mochiweb_socket_server:get(chttpd, port), @@ -35,7 +36,7 @@ setup() -> teardown(Url) -> delete_db(Url), - ok = config:delete("admins", ?USER, _Persist = false), + ok = config:delete("admins", ?USER, false), ok = config:delete("couchdb", "max_document_size"). create_db(Url) -> From 5864ba3c0b77f312c8244787344d8c2f9accf059 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Tue, 27 Aug 2024 16:33:27 -0700 Subject: [PATCH 13/15] Rework CSRT to use ets:fun2ms for static queries --- .../src/couch_stats_resource_tracker.erl | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index cb7a825beb0..3e3a75f37f4 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -63,9 +63,9 @@ %% aggregate query api -export([ - active/0, - active_coordinators/0, - active_workers/0, + active/0, active/1, + active_coordinators/0, active_coordinators/1, + active_workers/0, active_workers/1, count_by/1, group_by/2, group_by/3, @@ -87,6 +87,7 @@ ]). -include_lib("couch/include/couch_db.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). %% Module pdict markers -define(DELTA_TA, csrt_delta_ta). @@ -219,23 +220,26 @@ active() -> active_int(all). active_coordinators() -> active_int(coordinators). active_workers() -> active_int(workers). +%% active_json() or active(json)? +active(json) -> to_json_list(active_int(all)). +active_coordinators(json) -> to_json_list(active_int(coordinators)). +active_workers(json) -> to_json_list(active_int(workers)). + active_int(coordinators) -> select_by_type(coordinators); active_int(workers) -> select_by_type(workers); active_int(all) -> - lists:map(fun to_json/1, ets:tab2list(?MODULE)). + select_by_type(all). select_by_type(coordinators) -> - ets:select(couch_stats_resource_tracker, - [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); + ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {coordinator, _, _}} = R) -> R end)); select_by_type(workers) -> - ets:select(couch_stats_resource_tracker, - [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); + ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end)); select_by_type(all) -> - lists:map(fun to_json/1, ets:tab2list(?MODULE)). + ets:tab2list(?MODULE). find_by_pid(Pid) -> [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. @@ -362,6 +366,11 @@ convert_type(null) -> convert_type(undefined) -> null. +convert_path(undefined) -> + null; +convert_path(Path) when is_binary(Path) -> + Path. + convert_pidref({Parent0, ParentRef0}) -> Parent = convert_pid(Parent0), ParentRef = convert_ref(ParentRef0), @@ -386,6 +395,7 @@ to_json(#rctx{}=Rctx) -> nonce = Nonce, from = From, dbname = DbName, + path = Path, username = UserName, db_open = DbOpens, docs_read = DocsRead, @@ -410,6 +420,7 @@ to_json(#rctx{}=Rctx) -> nonce => Nonce, from => convert_pidref(From), dbname => DbName, + path => convert_path(Path), username => UserName, db_open => DbOpens, docs_read => DocsRead, @@ -755,6 +766,7 @@ track(#rctx{pid_ref=PidRef}) -> end. tracker({Pid, _Ref}=PidRef) -> + %%io:format("[~p]SPAWNED IN A TRACKER FOR ~p~n", [self(), PidRef]), MonRef = erlang:monitor(process, Pid), receive stop -> @@ -763,7 +775,9 @@ tracker({Pid, _Ref}=PidRef) -> catch evict(PidRef), demonitor(MonRef), ok; - {'DOWN', MonRef, _Type, _0DPid, _Reason0} -> + {'DOWN', MonRef, _Type, _0DPid, Reason0} -> + destroy_context(PidRef), + io:format("[~p]SPAWNED IN A TRACKER FOR ~p:: GOT DOWN WITH {~p}~n", [self(), Pid, Reason0]), %% TODO: should we pass reason to log_process_lifetime_report? %% Reason = case Reason0 of %% {shutdown, Shutdown0} -> @@ -877,6 +891,9 @@ conf_get(Key) -> conf_get(Key, Default) -> config:get(?MODULE_STRING, Key, Default). +to_json_list(List) when is_list(List) -> + lists:map(fun to_json/1, List). + %% %% Process lifetime logging api %% From e1fead4e54f608b3ab98978bd5caea6d2b23d4d4 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Wed, 28 Aug 2024 11:19:07 -0700 Subject: [PATCH 14/15] Remove debug statements --- src/couch_stats/src/couch_stats_resource_tracker.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index 3e3a75f37f4..98459d3ba68 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -766,7 +766,6 @@ track(#rctx{pid_ref=PidRef}) -> end. tracker({Pid, _Ref}=PidRef) -> - %%io:format("[~p]SPAWNED IN A TRACKER FOR ~p~n", [self(), PidRef]), MonRef = erlang:monitor(process, Pid), receive stop -> @@ -775,9 +774,8 @@ tracker({Pid, _Ref}=PidRef) -> catch evict(PidRef), demonitor(MonRef), ok; - {'DOWN', MonRef, _Type, _0DPid, Reason0} -> + {'DOWN', MonRef, _Type, _0DPid, _Reason0} -> destroy_context(PidRef), - io:format("[~p]SPAWNED IN A TRACKER FOR ~p:: GOT DOWN WITH {~p}~n", [self(), Pid, Reason0]), %% TODO: should we pass reason to log_process_lifetime_report? %% Reason = case Reason0 of %% {shutdown, Shutdown0} -> From 4535f1a1fa1fc902781a744b105fd7c1147c5efe Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 2 Sep 2024 16:44:40 -0700 Subject: [PATCH 15/15] Add find by nonce and some cleanup --- .../src/couch_stats_resource_tracker.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index 98459d3ba68..0594338bd2c 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -40,7 +40,7 @@ create_worker_context/3, destroy_context/0, destroy_context/1, - get_resource/0, get_resource/1, get_resource_raw/1, + get_resource/0, get_resource/1, set_context_dbname/1, set_context_dbname/2, set_context_handler_fun/1, set_context_handler_fun/2, @@ -74,6 +74,7 @@ find_by_pid/1, find_by_pidref/1, + find_by_nonce/1, find_workers_by_pidref/1 ]). @@ -193,12 +194,7 @@ get_resource() -> get_resource(undefined) -> undefined; get_resource(PidRef) -> - catch get_resource_raw(PidRef). - -get_resource_raw(undefined) -> - undefined; -get_resource_raw(PidRef) -> - case ets:lookup(?MODULE, PidRef) of + catch case ets:lookup(?MODULE, PidRef) of [#rctx{}=Rctx] -> Rctx; [] -> @@ -241,6 +237,9 @@ select_by_type(workers) -> select_by_type(all) -> ets:tab2list(?MODULE). +find_by_nonce(Nonce) -> + ets:match_object(?MODULE, ets:fun2ms(fun(#rctx{nonce = Nonce1} = R) when Nonce =:= Nonce1 -> R end)). + find_by_pid(Pid) -> [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. @@ -634,7 +633,7 @@ should_track(_Metric) -> false. ioq_called() -> - is_enabled() andalso inc(ioq_calls). + inc(ioq_calls). accumulate_delta(Delta) when is_map(Delta) -> %% TODO: switch to creating a batch of updates to invoke a single @@ -784,6 +783,7 @@ tracker({Pid, _Ref}=PidRef) -> %% Reason0 -> %% Reason0 %% end, + %% TODO: should we send the induced work delta to the coordinator? log_process_lifetime_report(PidRef), catch evict(PidRef) end. @@ -791,7 +791,7 @@ tracker({Pid, _Ref}=PidRef) -> log_process_lifetime_report(PidRef) -> case is_enabled() andalso is_logging_enabled() of true -> - Rctx = get_resource_raw(PidRef), + Rctx = get_resource(PidRef), case should_log(Rctx) of true -> couch_log:report("csrt-pid-usage-lifetime", to_json(Rctx));