From 58bf75890c84496654c3700cd7fc69e12176ad6d Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Tue, 6 Jun 2017 17:03:57 +0300 Subject: [PATCH 1/2] UCT/API/UCP: Hide slow-path progress elem and refcount in uct_worker. This commit does some preparations in UCT for the change in callback queue API: + Block async context outside of callback queue calls. + Keep callback reference count in a handle in uct_base_iface_t/ + Instead of exposing slow-path callback queue element in UCT API, hide it inside uct worker code. This will allocate the element internally and release when finished. --- src/ucp/core/ucp_ep.c | 34 +++++------- src/ucp/core/ucp_request.h | 7 +-- src/ucp/wireup/stub_ep.c | 16 +++--- src/ucp/wireup/stub_ep.h | 2 +- src/ucs/async/signal.h | 1 + src/uct/api/uct.h | 62 +++++----------------- src/uct/api/uct_def.h | 1 + src/uct/base/uct_iface.c | 1 + src/uct/base/uct_iface.h | 3 ++ src/uct/base/uct_worker.c | 80 ++++++++++++++++++++++++----- src/uct/base/uct_worker.h | 15 ++++++ src/uct/ib/cm/cm.h | 3 +- src/uct/ib/cm/cm_iface.c | 27 ++++------ src/uct/ib/dc/accel/dc_mlx5.c | 5 +- src/uct/ib/dc/verbs/dc_verbs.c | 5 +- src/uct/ib/rc/accel/rc_mlx5_ep.c | 5 +- src/uct/ib/rc/verbs/rc_verbs_ep.c | 5 +- src/uct/ib/ud/base/ud_ep.c | 5 +- src/uct/sm/mm/mm_ep.c | 35 ++++--------- src/uct/sm/mm/mm_ep.h | 3 +- src/uct/sm/mm/mm_iface.c | 14 +++-- src/uct/ugni/rdma/ugni_rdma_iface.c | 5 +- src/uct/ugni/smsg/ugni_smsg_iface.c | 5 +- src/uct/ugni/udt/ugni_udt_ep.c | 7 +-- 24 files changed, 185 insertions(+), 161 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 9b737584f4d..7a46cad61cc 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -330,16 +330,13 @@ static void ucp_ep_flush_progress(ucp_request_t *req) static void ucp_ep_flush_slow_path_remove(ucp_request_t *req) { ucp_ep_h ep = req->send.ep; - if (req->send.flush.cbq_elem_on) { - uct_worker_slowpath_progress_unregister(ep->worker->uct, - &req->send.flush.cbq_elem); - req->send.flush.cbq_elem_on = 0; - } + uct_worker_progress_unregister_safe(ep->worker->uct, + &req->send.flush.slow_cb_id); } -static void ucp_ep_flushed_slow_path_callback(ucs_callbackq_slow_elem_t *self) +static void ucp_ep_flushed_slow_path_callback(void *arg) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.flush.cbq_elem); + ucp_request_t *req = arg; ucp_ep_h ep = req->send.ep; ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); @@ -365,16 +362,15 @@ static int ucp_flush_check_completion(ucp_request_t *req) ucs_trace("adding slow-path callback to destroy ep %p", ep); ucp_ep_flush_slow_path_remove(req); - req->send.flush.cbq_elem.cb = ucp_ep_flushed_slow_path_callback; - req->send.flush.cbq_elem_on = 1; - uct_worker_slowpath_progress_register(ep->worker->uct, - &req->send.flush.cbq_elem); + uct_worker_progress_register_safe(ep->worker->uct, + ucp_ep_flushed_slow_path_callback, req, 0, + &req->send.flush.slow_cb_id); return 1; } -static void ucp_ep_flush_resume_slow_path_callback(ucs_callbackq_slow_elem_t *self) +static void ucp_ep_flush_resume_slow_path_callback(void *arg) { - ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.flush.cbq_elem); + ucp_request_t *req = arg; ucp_ep_flush_slow_path_remove(req); ucp_ep_flush_progress(req); @@ -405,12 +401,11 @@ static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self) completed = ucp_flush_check_completion(req); /* If the operation has not completed, add slow-path progress to resume */ - if (!completed && req->send.flush.lanes && !req->send.flush.cbq_elem_on) { + if (!completed && req->send.flush.lanes) { ucs_trace("ep %p: adding slow-path callback to resume flush", ep); - req->send.flush.cbq_elem.cb = ucp_ep_flush_resume_slow_path_callback; - req->send.flush.cbq_elem_on = 1; - uct_worker_slowpath_progress_register(ep->worker->uct, - &req->send.flush.cbq_elem); + uct_worker_progress_register_safe(ep->worker->uct, + ucp_ep_flush_resume_slow_path_callback, + req, 0, &req->send.flush.slow_cb_id); } if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { @@ -520,8 +515,7 @@ static ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep) req->send.ep = ep; req->send.flush.flushed_cb = ucp_ep_flushed_callback; req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep)); - req->send.flush.cbq_elem.cb = ucp_ep_flushed_slow_path_callback; - req->send.flush.cbq_elem_on = 0; + req->send.flush.slow_cb_id = NULL; req->send.lane = UCP_NULL_LANE; req->send.uct.func = ucp_ep_flush_progress_pending; req->send.uct_comp.func = ucp_ep_flush_completion; diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index a17a0d88512..f4c1d523b3b 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -116,10 +116,10 @@ struct ucp_request { struct { ucp_request_callback_t flushed_cb;/* Called when flushed */ - ucs_callbackq_slow_elem_t cbq_elem; /* Slow-path callback */ - uint8_t cbq_elem_on; + uct_worker_cb_id_t slow_cb_id;/* Slow-path callback */ ucp_lane_map_t lanes; /* Which lanes need to be flushed */ } flush; + struct { uint64_t remote_addr; /* Remote address */ ucp_atomic_fetch_op_t op; /* Requested AMO */ @@ -127,10 +127,11 @@ struct ucp_request { uint64_t value; void *result; } amo; + struct { void *rndv_op; /* Handler of issued rndv send. Needs to cancel the operation if it is completed by SW */ - } tag_offload; + } tag_offload; }; diff --git a/src/ucp/wireup/stub_ep.c b/src/ucp/wireup/stub_ep.c index 68be9f57788..64a86e9771f 100644 --- a/src/ucp/wireup/stub_ep.c +++ b/src/ucp/wireup/stub_ep.c @@ -52,9 +52,9 @@ static ucs_status_t ucp_stub_ep_connect_to_ep(uct_ep_h uct_ep, * We switch the endpoint in this function (instead in wireup code) since * this is guaranteed to run from the main thread. */ -static void ucp_stub_ep_progress(ucs_callbackq_slow_elem_t *elem) +static void ucp_stub_ep_progress(void *arg) { - ucp_stub_ep_t *stub_ep = ucs_container_of(elem, ucp_stub_ep_t, elem); + ucp_stub_ep_t *stub_ep = arg; ucp_ep_h ep = stub_ep->ep; ucs_queue_head_t tmp_pending_queue; uct_pending_req_t *uct_req; @@ -313,7 +313,7 @@ UCS_CLASS_INIT_FUNC(ucp_stub_ep_t, ucp_ep_h ep) self->aux_rsc_index = UCP_NULL_RESOURCE; self->pending_count = 0; self->flags = 0; - self->elem.cb = ucp_stub_ep_progress; + self->progress_id = NULL; ucs_queue_head_init(&self->pending_q); ucs_trace("ep %p: created stub ep %p to %s ", ep, self, ucp_ep_peer_name(ep)); return UCS_OK; @@ -326,10 +326,7 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_stub_ep_t) ucs_debug("ep %p: destroy stub ep %p", self->ep, self); - if (self->flags & UCP_STUB_EP_FLAG_READY) { - uct_worker_slowpath_progress_unregister(self->ep->worker->uct, - &self->elem); - } + uct_worker_progress_unregister_safe(self->ep->worker->uct, &self->progress_id); if (self->aux_ep != NULL) { uct_ep_destroy(self->aux_ep); } @@ -409,8 +406,9 @@ void ucp_stub_ep_remote_connected(uct_ep_h uct_ep) ucs_trace("ep %p: stub ep %p is remote-connected", stub_ep->ep, stub_ep); stub_ep->flags |= UCP_STUB_EP_FLAG_READY; - uct_worker_slowpath_progress_register(stub_ep->ep->worker->uct, - &stub_ep->elem); + uct_worker_progress_register_safe(stub_ep->ep->worker->uct, + ucp_stub_ep_progress, stub_ep, 0, + &stub_ep->progress_id); } int ucp_stub_ep_test(uct_ep_h uct_ep) diff --git a/src/ucp/wireup/stub_ep.h b/src/ucp/wireup/stub_ep.h index 7cf40ee0412..24d6d2083c3 100644 --- a/src/ucp/wireup/stub_ep.h +++ b/src/ucp/wireup/stub_ep.h @@ -39,7 +39,7 @@ struct ucp_stub_ep { ucp_rsc_index_t aux_rsc_index; /**< Index of auxiliary transport */ volatile uint32_t pending_count; /**< Number of pending wireup operations */ volatile uint32_t flags; /**< Connection state flags */ - ucs_callbackq_slow_elem_t elem; /**< Slow-path callback */ + uct_worker_cb_id_t progress_id; /**< ID of progress function */ }; diff --git a/src/ucs/async/signal.h b/src/ucs/async/signal.h index 659f52f29d3..51ed37cf358 100644 --- a/src/ucs/async/signal.h +++ b/src/ucs/async/signal.h @@ -10,6 +10,7 @@ #include #include +#include /* for ucs_get_tid() */ #include diff --git a/src/uct/api/uct.h b/src/uct/api/uct.h index 40cf7102370..6f5e9c27ab8 100644 --- a/src/uct/api/uct.h +++ b/src/uct/api/uct.h @@ -723,42 +723,6 @@ void uct_worker_destroy(uct_worker_h worker); void uct_worker_progress(uct_worker_h worker); -/** - * @ingroup UCT_CONTEXT - * @brief Add a callback function to a worker progress. - * - * Add a function which will be called every time a progress is made on the worker. - * - * @param [in] worker Handle to worker. - * @param [in] func Pointer to callback function. - * @param [in] arg Argument to the function. - * - * @note If the same function and argument are already on the list, their reference - * count will be incremented. - * @note This operation could potentially be slow. - */ -void uct_worker_progress_register(uct_worker_h worker, - ucs_callback_t func, void *arg); - - -/** - * @ingroup UCT_CONTEXT - * @brief Remove a callback function from worker's progress. - * - * Remove a previously added function from worker's progress. - * - * @param [in] worker Handle to worker. - * @param [in] func Pointer to callback function. - * @param [in] arg Argument to the function. - * - * @note If the reference count of the function is >1, it will be decremented and - * the function will not be removed. - * @note This operation could potentially be slow. - */ -void uct_worker_progress_unregister(uct_worker_h worker, - ucs_callback_t func, void *arg); - - /** * @ingroup UCT_CONTEXT * @brief Add a slow path callback function to a worker progress. @@ -768,13 +732,15 @@ void uct_worker_progress_unregister(uct_worker_h worker, * element is allocated by the caller, but the overhead of calling this function * is slightly higher than @ref uct_worker_progress_register. * - * @param [in] worker Handle to worker. - * @param [in] elem Callback function to add, with it's associated context. - * - * @note This operation could potentially be slow. + * @param [in] worker Handle to worker. + * @param [in] func Pointer to callback function. + * @param [in] arg Argument to the function. + * @param [in] flags Callback flags, see @ref ucs_callbackq_flags + * @param [inout] id_p Filled with callback ID. */ -void uct_worker_slowpath_progress_register(uct_worker_h worker, - ucs_callbackq_slow_elem_t *elem); +void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func, + void *arg, unsigned flags, + uct_worker_cb_id_t *id_p); /** @@ -783,14 +749,12 @@ void uct_worker_slowpath_progress_register(uct_worker_h worker, * * Remove a function previously added by @ref uct_worker_slowpath_progress_register. * - * @param [in] worker Handle to worker. - * @param [in] elem Callback element to remove. Must be the same pointer - * added earlier. - * - * @note This operation could potentially be slow. + * @param [in] worker Handle to worker. + * @param [inout] id_p Callback ID to remove, after a call to this + * function it's set to UCS_CALLBACKQ_ID_NULL. */ -void uct_worker_slowpath_progress_unregister(uct_worker_h worker, - ucs_callbackq_slow_elem_t *elem); +void uct_worker_progress_unregister_safe(uct_worker_h worker, + uct_worker_cb_id_t *id_p); /** diff --git a/src/uct/api/uct_def.h b/src/uct/api/uct_def.h index 85ef088d9b5..a7492271251 100644 --- a/src/uct/api/uct_def.h +++ b/src/uct/api/uct_def.h @@ -103,6 +103,7 @@ typedef struct uct_iface_addr uct_iface_addr_t; typedef struct uct_ep_addr uct_ep_addr_t; typedef struct uct_tag_context uct_tag_context_t; typedef uint64_t uct_tag_t; /* tag type - 64 bit */ +typedef void* uct_worker_cb_id_t; /** * @} */ diff --git a/src/uct/base/uct_iface.c b/src/uct/base/uct_iface.c index 5cffaa3cda1..62b1125e267 100644 --- a/src/uct/base/uct_iface.c +++ b/src/uct/base/uct_iface.c @@ -380,6 +380,7 @@ UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops, uct_md_h md, self->am_tracer_arg = NULL; self->err_handler = params->err_handler; self->err_handler_arg = params->err_handler_arg; + uct_worker_progress_init(&self->prog); for (id = 0; id < UCT_AM_ID_MAX; ++id) { uct_iface_set_stub_am_handler(self, id); diff --git a/src/uct/base/uct_iface.h b/src/uct/base/uct_iface.h index 4f39ec259d5..c20b5d0c53f 100644 --- a/src/uct/base/uct_iface.h +++ b/src/uct/base/uct_iface.h @@ -7,6 +7,8 @@ #ifndef UCT_IFACE_H_ #define UCT_IFACE_H_ +#include "uct_worker.h" + #include #include #include @@ -178,6 +180,7 @@ typedef struct uct_base_iface { uct_iface_t super; uct_md_h md; /* MD this interface is using */ uct_worker_h worker; /* Worker this interface is on */ + uct_worker_progress_t prog; UCS_STATS_NODE_DECLARE(stats); /* Statistics */ uct_am_handler_t am[UCT_AM_ID_MAX]; /* Active message table */ uct_am_tracer_t am_tracer; /* Active message tracer */ diff --git a/src/uct/base/uct_worker.c b/src/uct/base/uct_worker.c index a4106963fcb..bc923234fd0 100644 --- a/src/uct/base/uct_worker.c +++ b/src/uct/base/uct_worker.c @@ -9,15 +9,23 @@ #include "uct_worker.h" #include +#include #include +typedef struct uct_worker_slowpath_elem { + ucs_callbackq_slow_elem_t super; + ucs_callback_t cb; + void *arg; +} uct_worker_slowpath_elem_t; + + static UCS_CLASS_INIT_FUNC(uct_worker_t, ucs_async_context_t *async, ucs_thread_mode_t thread_mode) { self->async = async; self->thread_mode = thread_mode; - ucs_callbackq_init(&self->progress_q, 64, async); + ucs_callbackq_init(&self->progress_q, 64, NULL); ucs_list_head_init(&self->tl_data); return UCS_OK; } @@ -37,27 +45,75 @@ void uct_worker_progress(uct_worker_h worker) ucs_callbackq_dispatch(&worker->progress_q); } -void uct_worker_progress_register(uct_worker_h worker, - ucs_callback_t func, void *arg) +void uct_worker_progress_init(uct_worker_progress_t *prog) +{ + prog->cb = NULL; + prog->arg = NULL; + prog->refcount = 0; +} + +void uct_worker_progress_register(uct_worker_h worker, ucs_callback_t cb, + void *arg, uct_worker_progress_t *prog) { - ucs_callbackq_add(&worker->progress_q, func, arg); + UCS_ASYNC_BLOCK(worker->async); + if (prog->refcount++ == 0) { + prog->cb = cb; + prog->arg = arg; + ucs_callbackq_add(&worker->progress_q, cb, arg); + } + UCS_ASYNC_UNBLOCK(worker->async); } void uct_worker_progress_unregister(uct_worker_h worker, - ucs_callback_t func, void *arg) + uct_worker_progress_t *prog) { - ucs_callbackq_remove(&worker->progress_q, func, arg); + UCS_ASYNC_BLOCK(worker->async); + ucs_assert(prog->refcount > 0); + if (--prog->refcount == 0) { + ucs_callbackq_remove(&worker->progress_q, prog->cb, prog->arg); + } + UCS_ASYNC_UNBLOCK(worker->async); } -void uct_worker_slowpath_progress_register(uct_worker_h worker, - ucs_callbackq_slow_elem_t *elem) +static void uct_worker_slowpath_proxy(ucs_callbackq_slow_elem_t *self) { - ucs_callbackq_add_slow_path(&worker->progress_q, elem); + uct_worker_slowpath_elem_t *elem = ucs_derived_of(self, uct_worker_slowpath_elem_t); + elem->cb(elem->arg); } -void uct_worker_slowpath_progress_unregister(uct_worker_h worker, - ucs_callbackq_slow_elem_t *elem) +void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func, + void *arg, unsigned flags, + uct_worker_cb_id_t *id_p) { - ucs_callbackq_remove_slow_path(&worker->progress_q, elem); + uct_worker_slowpath_elem_t *elem; + + if (*id_p == NULL) { + UCS_ASYNC_BLOCK(worker->async); + + elem = ucs_malloc(sizeof(*elem), "uct_worker_slowpath_elem"); + ucs_assert_always(elem != NULL); + + elem->super.cb = uct_worker_slowpath_proxy; + elem->cb = func; + elem->arg = arg; + ucs_callbackq_add_slow_path(&worker->progress_q, &elem->super); + *id_p = elem; + + UCS_ASYNC_UNBLOCK(worker->async); + } } +void uct_worker_progress_unregister_safe(uct_worker_h worker, + uct_worker_cb_id_t *id_p) +{ + uct_worker_slowpath_elem_t *elem; + + if (*id_p != NULL) { + UCS_ASYNC_BLOCK(worker->async); + elem = *id_p; + ucs_callbackq_remove_slow_path(&worker->progress_q, &elem->super); + ucs_free(elem); + UCS_ASYNC_UNBLOCK(worker->async); + *id_p = NULL; + } +} diff --git a/src/uct/base/uct_worker.h b/src/uct/base/uct_worker.h index 30b2cf7978e..4eb7b1b7240 100644 --- a/src/uct/base/uct_worker.h +++ b/src/uct/base/uct_worker.h @@ -32,6 +32,13 @@ struct uct_worker { }; +typedef struct uct_worker_progress { + ucs_callback_t cb; + void *arg; + int refcount; +} uct_worker_progress_t; + + #define uct_worker_tl_data_get(_worker, _key, _type, _cmp_fn, _init_fn, ...) \ ({ \ uct_worker_tl_data_t *data; \ @@ -69,4 +76,12 @@ struct uct_worker { } +void uct_worker_progress_init(uct_worker_progress_t *prog); + +void uct_worker_progress_register(uct_worker_h worker, ucs_callback_t cb, + void *arg, uct_worker_progress_t *prog); + +void uct_worker_progress_unregister(uct_worker_h worker, + uct_worker_progress_t *prog); + #endif diff --git a/src/uct/ib/cm/cm.h b/src/uct/ib/cm/cm.h index feaa0490190..226d60fc6b4 100644 --- a/src/uct/ib/cm/cm.h +++ b/src/uct/ib/cm/cm.h @@ -51,8 +51,7 @@ typedef struct uct_cm_iface { ucs_queue_head_t notify_q; /* Notification queue */ uint32_t num_outstanding; /* Number of outstanding sends */ ucs_queue_head_t outstanding_q; /* Outstanding operations queue */ - ucs_callbackq_slow_elem_t cbq_elem; /* Slow-path callback */ - uint8_t cbq_elem_on; + uct_worker_cb_id_t slow_prog_id; /* Callback id for slowpath progress */ struct { int timeout_ms; diff --git a/src/uct/ib/cm/cm_iface.c b/src/uct/ib/cm/cm_iface.c index 41c109361a7..674607b4b75 100644 --- a/src/uct/ib/cm/cm_iface.c +++ b/src/uct/ib/cm/cm_iface.c @@ -37,14 +37,13 @@ static ucs_config_field_t uct_cm_iface_config_table[] = { static uct_ib_iface_ops_t uct_cm_iface_ops; -static void uct_cm_iface_progress(ucs_callbackq_slow_elem_t *arg) +static void uct_cm_iface_progress(void *arg) { uct_cm_pending_req_priv_t *priv; - uct_cm_iface_t *iface = ucs_container_of(arg, uct_cm_iface_t, cbq_elem); + uct_cm_iface_t *iface = arg; uct_cm_iface_op_t *op; uct_cm_enter(iface); - ucs_assert(iface->cbq_elem_on == 1); /* Invoke flush completions at the head of the queue - the sends which * started before them were already completed. @@ -64,9 +63,8 @@ static void uct_cm_iface_progress(ucs_callbackq_slow_elem_t *arg) if (ucs_queue_is_empty(&iface->outstanding_q) || ucs_queue_head_elem_non_empty(&iface->outstanding_q, uct_cm_iface_op_t, queue)->is_id) { - uct_worker_slowpath_progress_unregister(uct_cm_iface_worker(iface), - &iface->cbq_elem); - iface->cbq_elem_on = 0; + uct_worker_progress_unregister_safe(uct_cm_iface_worker(iface), + &iface->slow_prog_id); } uct_cm_leave(iface); @@ -235,11 +233,9 @@ static void uct_cm_iface_event_handler(int fd, void *arg) } } - if (!iface->cbq_elem_on) { - uct_worker_slowpath_progress_register(uct_cm_iface_worker(iface), - &iface->cbq_elem); - iface->cbq_elem_on = 1; - } + uct_worker_progress_register_safe(uct_cm_iface_worker(iface), + uct_cm_iface_progress, iface, 0, + &iface->slow_prog_id); } } @@ -278,8 +274,7 @@ static UCS_CLASS_INIT_FUNC(uct_cm_iface_t, uct_md_h md, uct_worker_h worker, self->config.max_outstanding = config->max_outstanding; self->config.retry_count = ucs_min(config->retry_count, UINT8_MAX); self->notify_q.head = NULL; - self->cbq_elem_on = 0; - self->cbq_elem.cb = uct_cm_iface_progress; + self->slow_prog_id = NULL; ucs_queue_head_init(&self->notify_q); ucs_queue_head_init(&self->outstanding_q); @@ -361,10 +356,8 @@ static UCS_CLASS_CLEANUP_FUNC(uct_cm_iface_t) uct_cm_iface_outstanding_purge(self); ib_cm_destroy_id(self->listen_id); ib_cm_close_device(self->cmdev); - if (self->cbq_elem_on) { - uct_worker_slowpath_progress_unregister(uct_cm_iface_worker(self), - &self->cbq_elem); - } + uct_worker_progress_unregister_safe(uct_cm_iface_worker(self), + &self->slow_prog_id); uct_cm_leave(self); /* At this point all outstanding have been removed, and no further events diff --git a/src/uct/ib/dc/accel/dc_mlx5.c b/src/uct/ib/dc/accel/dc_mlx5.c index 439abc6c901..99ae812454b 100644 --- a/src/uct/ib/dc/accel/dc_mlx5.c +++ b/src/uct/ib/dc/accel/dc_mlx5.c @@ -773,7 +773,8 @@ static UCS_CLASS_INIT_FUNC(uct_dc_mlx5_iface_t, uct_md_h md, uct_worker_h worker sizeof(struct mlx5_wqe_data_seg)); /* TODO: only register progress when we have a connection */ - uct_worker_progress_register(worker, uct_dc_mlx5_iface_progress, self); + uct_worker_progress_register(worker, uct_dc_mlx5_iface_progress, self, + &self->super.super.super.super.prog); ucs_debug("created dc iface %p", self); return UCS_OK; @@ -787,7 +788,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_mlx5_iface_t) { ucs_trace_func(""); uct_worker_progress_unregister(self->super.super.super.super.worker, - uct_dc_mlx5_iface_progress, self); + &self->super.super.super.super.prog); uct_rc_mlx5_iface_common_cleanup(&self->mlx5_common); } diff --git a/src/uct/ib/dc/verbs/dc_verbs.c b/src/uct/ib/dc/verbs/dc_verbs.c index 95b78b3ce9b..8d800482899 100644 --- a/src/uct/ib/dc/verbs/dc_verbs.c +++ b/src/uct/ib/dc/verbs/dc_verbs.c @@ -869,7 +869,8 @@ static UCS_CLASS_INIT_FUNC(uct_dc_verbs_iface_t, uct_md_h md, uct_worker_h worke } /* TODO: only register progress when we have a connection */ - uct_worker_progress_register(worker, uct_dc_verbs_iface_progress, self); + uct_worker_progress_register(worker, uct_dc_verbs_iface_progress, self, + &self->super.super.super.super.prog); ucs_debug("created dc iface %p", self); return UCS_OK; @@ -883,7 +884,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_dc_verbs_iface_t) { ucs_trace_func(""); uct_worker_progress_unregister(self->super.super.super.super.worker, - uct_dc_verbs_iface_progress, self); + &self->super.super.super.super.prog); uct_rc_verbs_iface_common_cleanup(&self->verbs_common); } diff --git a/src/uct/ib/rc/accel/rc_mlx5_ep.c b/src/uct/ib/rc/accel/rc_mlx5_ep.c index b5537982f74..cbbc21eb1e1 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_ep.c +++ b/src/uct/ib/rc/accel/rc_mlx5_ep.c @@ -441,7 +441,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, uct_iface_h tl_iface) uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max); uct_worker_progress_register(iface->super.super.super.worker, - uct_rc_mlx5_iface_progress, iface); + uct_rc_mlx5_iface_progress, iface, + &iface->super.super.super.prog); return UCS_OK; } @@ -451,7 +452,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t) uct_rc_mlx5_iface_t); uct_worker_progress_unregister(iface->super.super.super.worker, - uct_rc_mlx5_iface_progress, iface); + &iface->super.super.super.prog); uct_ib_mlx5_txwq_cleanup(iface->super.super.super.worker, &self->tx.wq); /* Modify QP to error to make HW generate CQEs for all in-progress SRQ diff --git a/src/uct/ib/rc/verbs/rc_verbs_ep.c b/src/uct/ib/rc/verbs/rc_verbs_ep.c index 2d6ad42c85a..acd5a7a1df4 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_ep.c +++ b/src/uct/ib/rc/verbs/rc_verbs_ep.c @@ -831,7 +831,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_verbs_ep_t, uct_iface_h tl_iface) uct_rc_verbs_txcnt_init(&self->txcnt); uct_worker_progress_register(iface->super.super.super.worker, - iface->progress, iface); + iface->progress, iface, + &iface->super.super.super.prog); return uct_rc_verbs_ep_tag_qp_create(iface, self); } @@ -841,7 +842,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_verbs_ep_t) uct_rc_verbs_iface_t *iface = ucs_derived_of(self->super.super.super.iface, uct_rc_verbs_iface_t); uct_worker_progress_unregister(iface->super.super.super.worker, - iface->progress, iface); + &iface->super.super.super.prog); uct_rc_verbs_ep_tag_qp_destroy(self); } diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index 6df21b31123..8964572402b 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -165,7 +165,7 @@ UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface) uct_worker_progress_register(iface->super.super.worker, ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->progress, - iface); + iface, &iface->super.super.prog); /* need to remove handler from the async */ ucs_debug("worker=%p iface=%p remove async handler fd=%d", @@ -209,8 +209,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_ud_ep_t) ucs_trace_func("ep=%p id=%d conn_id=%d", self, self->ep_id, self->conn_id); uct_worker_progress_unregister(iface->super.super.worker, - ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->progress, - iface); + &iface->super.super.prog); ucs_wtimer_remove(&self->slow_timer); uct_ud_iface_remove_ep(iface, self); diff --git a/src/uct/sm/mm/mm_ep.c b/src/uct/sm/mm/mm_ep.c index 41c9aa990df..3cf28987a5c 100644 --- a/src/uct/sm/mm/mm_ep.c +++ b/src/uct/sm/mm/mm_ep.c @@ -25,19 +25,12 @@ void uct_mm_ep_connected(uct_mm_ep_t *ep) ep->cached_tail = ep->fifo_ctl->tail; } -static void -uct_mm_ep_signal_remote_slow_path_callback(ucs_callbackq_slow_elem_t *self) +static void uct_mm_ep_signal_remote_slow_path_callback(void *arg) { - uct_mm_ep_t *ep = ucs_container_of(self, uct_mm_ep_t, cbq_elem); - + uct_mm_ep_t *ep = arg; uct_mm_ep_signal_remote(ep, UCT_MM_IFACE_SIGNAL_CONNECT); } -void uct_mm_ep_remove_slow_path_callback(uct_mm_iface_t *iface, uct_mm_ep_t *ep) -{ - uct_worker_slowpath_progress_unregister(iface->super.worker, &ep->cbq_elem); - ep->cbq_elem_on = 0; -} /* send a signal to remote interface using Unix-domain socket */ static ucs_status_t @@ -57,9 +50,7 @@ uct_mm_ep_signal_remote(uct_mm_ep_t *ep, uct_mm_iface_conn_signal_t sig) ucs_debug("Sent connect from socket %d to %p", iface->signal_fd, (const struct sockaddr*)&ep->cached_signal_sockaddr); - if (ep->cbq_elem_on) { - uct_mm_ep_remove_slow_path_callback(iface, ep); - } + uct_worker_progress_unregister_safe(iface->super.worker, &ep->slow_cb_id); /* point the ep->fifo_ctl to the remote fifo */ uct_mm_ep_connected(ep); @@ -78,10 +69,10 @@ uct_mm_ep_signal_remote(uct_mm_ep_t *ep, uct_mm_iface_conn_signal_t sig) * prevents the reading of incoming messages which blocks the remote sender. * Add the sending attempt as a callback to a slow progress. */ - if ((!ep->cbq_elem_on) && (sig == UCT_MM_IFACE_SIGNAL_CONNECT)) { - ep->cbq_elem.cb = uct_mm_ep_signal_remote_slow_path_callback; - uct_worker_slowpath_progress_register(iface->super.worker, &ep->cbq_elem); - ep->cbq_elem_on = 1; + if (sig == UCT_MM_IFACE_SIGNAL_CONNECT) { + uct_worker_progress_register_safe(iface->super.worker, + uct_mm_ep_signal_remote_slow_path_callback, + ep, 0, &ep->slow_cb_id); } /* Return UCS_OK in this case even though couldn't send, so that the @@ -144,7 +135,7 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, uct_iface_t *tl_iface, self->cached_signal_addrlen = remote_fifo_ctl->signal_addrlen; self->cached_signal_sockaddr = remote_fifo_ctl->signal_sockaddr; - self->cbq_elem_on = 0; + self->slow_cb_id = NULL; /* Send connect message to remote side so it will start polling */ status = uct_mm_ep_signal_remote(self, UCT_MM_IFACE_SIGNAL_CONNECT); @@ -161,7 +152,7 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, uct_iface_t *tl_iface, /* Register for send side progress */ uct_worker_progress_register(iface->super.worker, uct_mm_iface_progress, - iface); + iface, &iface->super.prog); ucs_debug("mm: ep connected: %p, to remote_shmid: %zu", self, addr->id); @@ -179,13 +170,9 @@ static UCS_CLASS_CLEANUP_FUNC(uct_mm_ep_t) * from progressing and reading incoming messages */ /* make sure the slow path function isn't invoked after the ep's cleanup */ - if (self->cbq_elem_on) { - ucs_debug("Removing a remaining slow path progress function."); - uct_mm_ep_remove_slow_path_callback(iface, self); - } + uct_worker_progress_unregister_safe(iface->super.worker, &self->slow_cb_id); - uct_worker_progress_unregister(iface->super.worker, uct_mm_iface_progress, - iface); + uct_worker_progress_unregister(iface->super.worker, &iface->super.prog); for (remote_seg = sglib_hashed_uct_mm_remote_seg_t_it_init(&iter, self->remote_segments_hash); remote_seg != NULL; remote_seg = sglib_hashed_uct_mm_remote_seg_t_it_next(&iter)) { diff --git a/src/uct/sm/mm/mm_ep.h b/src/uct/sm/mm/mm_ep.h index 74323e1d4dc..eb5115f61ce 100644 --- a/src/uct/sm/mm/mm_ep.h +++ b/src/uct/sm/mm/mm_ep.h @@ -36,8 +36,7 @@ struct uct_mm_ep { socklen_t cached_signal_addrlen; /* cached address length of signaling socket */ struct sockaddr_un cached_signal_sockaddr; /* cached address of signaling socket */ - ucs_callbackq_slow_elem_t cbq_elem; /* Slow-path callback */ - uint8_t cbq_elem_on; + uct_worker_cb_id_t slow_cb_id; /* Slow-path callback */ /* Remote peer */ uct_mm_remote_seg_t mapped_desc; /* pointer to the descriptor of the destination's shared_mem (FIFO) */ diff --git a/src/uct/sm/mm/mm_iface.c b/src/uct/sm/mm/mm_iface.c index 5be26f9a485..44be7c103f6 100644 --- a/src/uct/sm/mm/mm_iface.c +++ b/src/uct/sm/mm/mm_iface.c @@ -403,8 +403,12 @@ static void uct_mm_iface_recv_messages(uct_mm_iface_t *iface) ret = recvfrom(iface->signal_fd, &sig, sizeof(sig), 0, NULL, 0); if (ret == sizeof(sig)) { ucs_debug("mm_iface %p: got connect message", iface); - ucs_callbackq_add_safe(&iface->super.worker->progress_q, - uct_mm_iface_progress, iface); + if (iface->super.prog.refcount++ == 0) { + iface->super.prog.cb = uct_mm_iface_progress; + iface->super.prog.arg = iface; + ucs_callbackq_add_safe(&iface->super.worker->progress_q, + uct_mm_iface_progress, iface); + } } else { if (ret < 0) { if (errno != EAGAIN) { @@ -566,8 +570,10 @@ static UCS_CLASS_CLEANUP_FUNC(uct_mm_iface_t) ucs_async_remove_handler(self->signal_fd, 1); - ucs_callbackq_remove_all(&self->super.worker->progress_q, - uct_mm_iface_progress, self); + if (self->super.prog.refcount) { + self->super.prog.refcount = 1; + uct_worker_progress_unregister(self->super.worker, &self->super.prog); + } /* return all the descriptors that are now 'assigned' to the FIFO, * to their mpool */ diff --git a/src/uct/ugni/rdma/ugni_rdma_iface.c b/src/uct/ugni/rdma/ugni_rdma_iface.c index 088bfd6f754..5e56bb66f64 100644 --- a/src/uct/ugni/rdma/ugni_rdma_iface.c +++ b/src/uct/ugni/rdma/ugni_rdma_iface.c @@ -150,7 +150,7 @@ void uct_ugni_progress(void *arg) static UCS_CLASS_CLEANUP_FUNC(uct_ugni_rdma_iface_t) { uct_worker_progress_unregister(self->super.super.worker, - uct_ugni_progress, self); + &self->super.super.prog); ucs_mpool_cleanup(&self->free_desc_get_buffer, 1); ucs_mpool_cleanup(&self->free_desc_get, 1); ucs_mpool_cleanup(&self->free_desc_famo, 1); @@ -325,7 +325,8 @@ static UCS_CLASS_INIT_FUNC(uct_ugni_rdma_iface_t, uct_md_h md, uct_worker_h work /* TBD: eventually the uct_ugni_progress has to be moved to * rdma layer so each ugni layer will have own progress */ - uct_worker_progress_register(worker, uct_ugni_progress, self); + uct_worker_progress_register(worker, uct_ugni_progress, self, + &self->super.super.prog); return UCS_OK; clean_famo: diff --git a/src/uct/ugni/smsg/ugni_smsg_iface.c b/src/uct/ugni/smsg/ugni_smsg_iface.c index 4a714b6c9a7..4c7d84c5852 100644 --- a/src/uct/ugni/smsg/ugni_smsg_iface.c +++ b/src/uct/ugni/smsg/ugni_smsg_iface.c @@ -217,7 +217,7 @@ static ucs_status_t uct_ugni_smsg_iface_query(uct_iface_h tl_iface, uct_iface_at static UCS_CLASS_CLEANUP_FUNC(uct_ugni_smsg_iface_t) { uct_worker_progress_unregister(self->super.super.worker, - uct_ugni_smsg_progress, self); + &self->super.super.prog); ucs_mpool_cleanup(&self->free_desc, 1); ucs_mpool_cleanup(&self->free_mbox, 1); uct_ugni_destroy_cq(self->remote_cq, &self->super.cdm); @@ -338,7 +338,8 @@ static UCS_CLASS_INIT_FUNC(uct_ugni_smsg_iface_t, uct_md_h md, uct_worker_h work /* TBD: eventually the uct_ugni_progress has to be moved to * udt layer so each ugni layer will have own progress */ - uct_worker_progress_register(worker, uct_ugni_smsg_progress, self); + uct_worker_progress_register(worker, uct_ugni_smsg_progress, self, + &self->super.super.prog); return UCS_OK; diff --git a/src/uct/ugni/udt/ugni_udt_ep.c b/src/uct/ugni/udt/ugni_udt_ep.c index 536b39b450b..af41951c09d 100644 --- a/src/uct/ugni/udt/ugni_udt_ep.c +++ b/src/uct/ugni/udt/ugni_udt_ep.c @@ -17,7 +17,8 @@ ucs_status_t uct_ugni_udt_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n) ucs_status_t status = uct_ugni_ep_pending_add(tl_ep, n); if (UCS_OK == status) { - uct_worker_progress_register(iface->super.worker, uct_ugni_udt_progress, iface); + uct_worker_progress_register(iface->super.worker, uct_ugni_udt_progress, + iface, &iface->super.prog); } return status; } @@ -32,7 +33,7 @@ ucs_arbiter_cb_result_t uct_ugni_udt_ep_process_pending(ucs_arbiter_t *arbiter, result = uct_ugni_ep_process_pending(arbiter, elem, arg); if (UCS_ARBITER_CB_RESULT_REMOVE_ELEM == result) { - uct_worker_progress_unregister(iface->super.worker, uct_ugni_udt_progress, iface); + uct_worker_progress_unregister(iface->super.worker, &iface->super.prog); } return result; } @@ -47,7 +48,7 @@ static ucs_arbiter_cb_result_t uct_ugni_udt_ep_abriter_purge_cb(ucs_arbiter_t *a result = uct_ugni_ep_abriter_purge_cb(arbiter, elem, arg); if (UCS_ARBITER_CB_RESULT_REMOVE_ELEM == result) { - uct_worker_progress_unregister(iface->super.worker, uct_ugni_udt_progress, iface); + uct_worker_progress_unregister(iface->super.worker, &iface->super.prog); } return result; } From e290cb084dd15d5f5259f34e9220771db50e2dff Mon Sep 17 00:00:00 2001 From: Yossi Itigin Date: Wed, 14 Jun 2017 13:23:52 +0300 Subject: [PATCH 2/2] UCT/API: Update documentation for progress register/unregister. --- src/uct/api/uct.h | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/uct/api/uct.h b/src/uct/api/uct.h index 5000bb902f5..56536041433 100644 --- a/src/uct/api/uct.h +++ b/src/uct/api/uct.h @@ -773,16 +773,20 @@ void uct_worker_progress(uct_worker_h worker); * @ingroup UCT_CONTEXT * @brief Add a slow path callback function to a worker progress. * - * Add a function which will be called every time a progress is made on the worker. - * The number of functions which can be added this way is unlimited since the - * element is allocated by the caller, but the overhead of calling this function - * is slightly higher than @ref uct_worker_progress_register. + * If 'id_p' points to a NULL identifier, this function will add a callback which + * will be called every time a progress is made on the worker. * - * @param [in] worker Handle to worker. - * @param [in] func Pointer to callback function. - * @param [in] arg Argument to the function. - * @param [in] flags Callback flags, see @ref ucs_callbackq_flags - * @param [inout] id_p Filled with callback ID. + * @param [in] worker Handle to the worker whose progress should invoke + * the callback. + * @param [in] func Pointer to the callback function. + * @param [in] arg Argument for the callback function. + * @param [in] flags Callback flags, see @ref ucs_callbackq_flags. + * @param [inout] id_p Filled with callback identifier, which can be + * used to remove the callback. If *id_p is non-NULL, + * no operation will be performed and *id_p will be + * left unchanged. + * + * @note This function is thread safe. */ void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func, void *arg, unsigned flags, @@ -793,11 +797,17 @@ void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func, * @ingroup UCT_CONTEXT * @brief Remove a slow path callback function from worker's progress. * - * Remove a function previously added by @ref uct_worker_slowpath_progress_register. + * If 'id_p' points to a non-NULL handle, remove a callback which was previously + * added by @ref uct_worker_progress_register_safe. + * + * @param [in] worker Handle to the worker whose progress should invoke + * the callback. + * @param [inout] id_p Points to callback identifier to remove, and filled + * with NULL by this function. If *id_p already points + * to NULL, no operation will be performed and *id_p + * will be left unchanged. * - * @param [in] worker Handle to worker. - * @param [inout] id_p Callback ID to remove, after a call to this - * function it's set to UCS_CALLBACKQ_ID_NULL. + * @note This function is thread safe. */ void uct_worker_progress_unregister_safe(uct_worker_h worker, uct_worker_cb_id_t *id_p);