Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/API/UCP: Hide slow-path progress elem and refcount in uct_worker. #1583

Merged
merged 3 commits into from
Jun 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,16 +330,13 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
static void ucp_ep_flush_slow_path_remove(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
if (req->send.flush.cbq_elem_on) {
uct_worker_slowpath_progress_unregister(ep->worker->uct,
&req->send.flush.cbq_elem);
req->send.flush.cbq_elem_on = 0;
}
uct_worker_progress_unregister_safe(ep->worker->uct,
&req->send.flush.slow_cb_id);
}

static void ucp_ep_flushed_slow_path_callback(ucs_callbackq_slow_elem_t *self)
static void ucp_ep_flushed_slow_path_callback(void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.flush.cbq_elem);
ucp_request_t *req = arg;
ucp_ep_h ep = req->send.ep;

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));
Expand All @@ -365,16 +362,15 @@ static int ucp_flush_check_completion(ucp_request_t *req)

ucs_trace("adding slow-path callback to destroy ep %p", ep);
ucp_ep_flush_slow_path_remove(req);
req->send.flush.cbq_elem.cb = ucp_ep_flushed_slow_path_callback;
req->send.flush.cbq_elem_on = 1;
uct_worker_slowpath_progress_register(ep->worker->uct,
&req->send.flush.cbq_elem);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flushed_slow_path_callback, req, 0,
&req->send.flush.slow_cb_id);
return 1;
}

static void ucp_ep_flush_resume_slow_path_callback(ucs_callbackq_slow_elem_t *self)
static void ucp_ep_flush_resume_slow_path_callback(void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.flush.cbq_elem);
ucp_request_t *req = arg;

ucp_ep_flush_slow_path_remove(req);
ucp_ep_flush_progress(req);
Expand Down Expand Up @@ -405,12 +401,11 @@ static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
completed = ucp_flush_check_completion(req);

/* If the operation has not completed, add slow-path progress to resume */
if (!completed && req->send.flush.lanes && !req->send.flush.cbq_elem_on) {
if (!completed && req->send.flush.lanes) {
ucs_trace("ep %p: adding slow-path callback to resume flush", ep);
req->send.flush.cbq_elem.cb = ucp_ep_flush_resume_slow_path_callback;
req->send.flush.cbq_elem_on = 1;
uct_worker_slowpath_progress_register(ep->worker->uct,
&req->send.flush.cbq_elem);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flush_resume_slow_path_callback,
req, 0, &req->send.flush.slow_cb_id);
}

if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
Expand Down Expand Up @@ -520,8 +515,7 @@ static ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep)
req->send.ep = ep;
req->send.flush.flushed_cb = ucp_ep_flushed_callback;
req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep));
req->send.flush.cbq_elem.cb = ucp_ep_flushed_slow_path_callback;
req->send.flush.cbq_elem_on = 0;
req->send.flush.slow_cb_id = NULL;
req->send.lane = UCP_NULL_LANE;
req->send.uct.func = ucp_ep_flush_progress_pending;
req->send.uct_comp.func = ucp_ep_flush_completion;
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ struct ucp_request {

struct {
ucp_request_callback_t flushed_cb;/* Called when flushed */
ucs_callbackq_slow_elem_t cbq_elem; /* Slow-path callback */
uint8_t cbq_elem_on;
uct_worker_cb_id_t slow_cb_id;/* Slow-path callback */
ucp_lane_map_t lanes; /* Which lanes need to be flushed */
} flush;

struct {
uint64_t remote_addr; /* Remote address */
ucp_atomic_fetch_op_t op; /* Requested AMO */
Expand Down
16 changes: 7 additions & 9 deletions src/ucp/wireup/stub_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ static ucs_status_t ucp_stub_ep_connect_to_ep(uct_ep_h uct_ep,
* We switch the endpoint in this function (instead in wireup code) since
* this is guaranteed to run from the main thread.
*/
static void ucp_stub_ep_progress(ucs_callbackq_slow_elem_t *elem)
static void ucp_stub_ep_progress(void *arg)
{
ucp_stub_ep_t *stub_ep = ucs_container_of(elem, ucp_stub_ep_t, elem);
ucp_stub_ep_t *stub_ep = arg;
ucp_ep_h ep = stub_ep->ep;
ucs_queue_head_t tmp_pending_queue;
uct_pending_req_t *uct_req;
Expand Down Expand Up @@ -313,7 +313,7 @@ UCS_CLASS_INIT_FUNC(ucp_stub_ep_t, ucp_ep_h ep)
self->aux_rsc_index = UCP_NULL_RESOURCE;
self->pending_count = 0;
self->flags = 0;
self->elem.cb = ucp_stub_ep_progress;
self->progress_id = NULL;
ucs_queue_head_init(&self->pending_q);
ucs_trace("ep %p: created stub ep %p to %s ", ep, self, ucp_ep_peer_name(ep));
return UCS_OK;
Expand All @@ -326,10 +326,7 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_stub_ep_t)

ucs_debug("ep %p: destroy stub ep %p", self->ep, self);

if (self->flags & UCP_STUB_EP_FLAG_READY) {
uct_worker_slowpath_progress_unregister(self->ep->worker->uct,
&self->elem);
}
uct_worker_progress_unregister_safe(self->ep->worker->uct, &self->progress_id);
if (self->aux_ep != NULL) {
uct_ep_destroy(self->aux_ep);
}
Expand Down Expand Up @@ -409,8 +406,9 @@ void ucp_stub_ep_remote_connected(uct_ep_h uct_ep)

ucs_trace("ep %p: stub ep %p is remote-connected", stub_ep->ep, stub_ep);
stub_ep->flags |= UCP_STUB_EP_FLAG_READY;
uct_worker_slowpath_progress_register(stub_ep->ep->worker->uct,
&stub_ep->elem);
uct_worker_progress_register_safe(stub_ep->ep->worker->uct,
ucp_stub_ep_progress, stub_ep, 0,
&stub_ep->progress_id);
}

int ucp_stub_ep_test(uct_ep_h uct_ep)
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/wireup/stub_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct ucp_stub_ep {
ucp_rsc_index_t aux_rsc_index; /**< Index of auxiliary transport */
volatile uint32_t pending_count; /**< Number of pending wireup operations */
volatile uint32_t flags; /**< Connection state flags */
ucs_callbackq_slow_elem_t elem; /**< Slow-path callback */
uct_worker_cb_id_t progress_id; /**< ID of progress function */
};


Expand Down
1 change: 1 addition & 0 deletions src/ucs/async/signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <ucs/datastruct/list.h>
#include <ucs/type/status.h>
#include <ucs/sys/sys.h> /* for ucs_get_tid() */
#include <pthread.h>


Expand Down
78 changes: 26 additions & 52 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,74 +769,48 @@ void uct_worker_destroy(uct_worker_h worker);
void uct_worker_progress(uct_worker_h worker);


/**
* @ingroup UCT_CONTEXT
* @brief Add a callback function to a worker progress.
*
* Add a function which will be called every time a progress is made on the worker.
*
* @param [in] worker Handle to worker.
* @param [in] func Pointer to callback function.
* @param [in] arg Argument to the function.
*
* @note If the same function and argument are already on the list, their reference
* count will be incremented.
* @note This operation could potentially be slow.
*/
void uct_worker_progress_register(uct_worker_h worker,
ucs_callback_t func, void *arg);


/**
* @ingroup UCT_CONTEXT
* @brief Remove a callback function from worker's progress.
*
* Remove a previously added function from worker's progress.
*
* @param [in] worker Handle to worker.
* @param [in] func Pointer to callback function.
* @param [in] arg Argument to the function.
*
* @note If the reference count of the function is >1, it will be decremented and
* the function will not be removed.
* @note This operation could potentially be slow.
*/
void uct_worker_progress_unregister(uct_worker_h worker,
ucs_callback_t func, void *arg);


/**
* @ingroup UCT_CONTEXT
* @brief Add a slow path callback function to a worker progress.
*
* Add a function which will be called every time a progress is made on the worker.
* The number of functions which can be added this way is unlimited since the
* element is allocated by the caller, but the overhead of calling this function
* is slightly higher than @ref uct_worker_progress_register.
* If 'id_p' points to a NULL identifier, this function will add a callback which
* will be called every time a progress is made on the worker.
*
* @param [in] worker Handle to worker.
* @param [in] elem Callback function to add, with it's associated context.
* @param [in] worker Handle to the worker whose progress should invoke
* the callback.
* @param [in] func Pointer to the callback function.
* @param [in] arg Argument for the callback function.
* @param [in] flags Callback flags, see @ref ucs_callbackq_flags.
* @param [inout] id_p Filled with callback identifier, which can be
* used to remove the callback. If *id_p is non-NULL,
* no operation will be performed and *id_p will be
* left unchanged.
*
* @note This operation could potentially be slow.
* @note This function is thread safe.
*/
void uct_worker_slowpath_progress_register(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem);
void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func,
void *arg, unsigned flags,
uct_worker_cb_id_t *id_p);


/**
* @ingroup UCT_CONTEXT
* @brief Remove a slow path callback function from worker's progress.
*
* Remove a function previously added by @ref uct_worker_slowpath_progress_register.
* If 'id_p' points to a non-NULL handle, remove a callback which was previously
* added by @ref uct_worker_progress_register_safe.
*
* @param [in] worker Handle to worker.
* @param [in] elem Callback element to remove. Must be the same pointer
* added earlier.
* @param [in] worker Handle to the worker whose progress should invoke
* the callback.
* @param [inout] id_p Points to callback identifier to remove, and filled
* with NULL by this function. If *id_p already points
* to NULL, no operation will be performed and *id_p
* will be left unchanged.
*
* @note This operation could potentially be slow.
* @note This function is thread safe.
*/
void uct_worker_slowpath_progress_unregister(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem);
void uct_worker_progress_unregister_safe(uct_worker_h worker,
uct_worker_cb_id_t *id_p);


/**
Expand Down
1 change: 1 addition & 0 deletions src/uct/api/uct_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ typedef struct uct_iface_addr uct_iface_addr_t;
typedef struct uct_ep_addr uct_ep_addr_t;
typedef struct uct_tag_context uct_tag_context_t;
typedef uint64_t uct_tag_t; /* tag type - 64 bit */
typedef void* uct_worker_cb_id_t;
/**
* @}
*/
Expand Down
1 change: 1 addition & 0 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops, uct_md_h md,
self->am_tracer_arg = NULL;
self->err_handler = params->err_handler;
self->err_handler_arg = params->err_handler_arg;
uct_worker_progress_init(&self->prog);

for (id = 0; id < UCT_AM_ID_MAX; ++id) {
uct_iface_set_stub_am_handler(self, id);
Expand Down
3 changes: 3 additions & 0 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#ifndef UCT_IFACE_H_
#define UCT_IFACE_H_

#include "uct_worker.h"

#include <uct/api/uct.h>
#include <ucs/config/parser.h>
#include <ucs/datastruct/mpool.h>
Expand Down Expand Up @@ -178,6 +180,7 @@ typedef struct uct_base_iface {
uct_iface_t super;
uct_md_h md; /* MD this interface is using */
uct_worker_h worker; /* Worker this interface is on */
uct_worker_progress_t prog;
UCS_STATS_NODE_DECLARE(stats); /* Statistics */
uct_am_handler_t am[UCT_AM_ID_MAX]; /* Active message table */
uct_am_tracer_t am_tracer; /* Active message tracer */
Expand Down
80 changes: 68 additions & 12 deletions src/uct/base/uct_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
#include "uct_worker.h"

#include <ucs/type/class.h>
#include <ucs/async/async.h>
#include <ucs/datastruct/callbackq.inl>


typedef struct uct_worker_slowpath_elem {
ucs_callbackq_slow_elem_t super;
ucs_callback_t cb;
void *arg;
} uct_worker_slowpath_elem_t;


static UCS_CLASS_INIT_FUNC(uct_worker_t, ucs_async_context_t *async,
ucs_thread_mode_t thread_mode)
{
self->async = async;
self->thread_mode = thread_mode;
ucs_callbackq_init(&self->progress_q, 64, async);
ucs_callbackq_init(&self->progress_q, 64, NULL);
ucs_list_head_init(&self->tl_data);
return UCS_OK;
}
Expand All @@ -37,27 +45,75 @@ void uct_worker_progress(uct_worker_h worker)
ucs_callbackq_dispatch(&worker->progress_q);
}

void uct_worker_progress_register(uct_worker_h worker,
ucs_callback_t func, void *arg)
void uct_worker_progress_init(uct_worker_progress_t *prog)
{
prog->cb = NULL;
prog->arg = NULL;
prog->refcount = 0;
}

void uct_worker_progress_register(uct_worker_h worker, ucs_callback_t cb,
void *arg, uct_worker_progress_t *prog)
{
ucs_callbackq_add(&worker->progress_q, func, arg);
UCS_ASYNC_BLOCK(worker->async);
if (prog->refcount++ == 0) {
prog->cb = cb;
prog->arg = arg;
ucs_callbackq_add(&worker->progress_q, cb, arg);
}
UCS_ASYNC_UNBLOCK(worker->async);
}

void uct_worker_progress_unregister(uct_worker_h worker,
ucs_callback_t func, void *arg)
uct_worker_progress_t *prog)
{
ucs_callbackq_remove(&worker->progress_q, func, arg);
UCS_ASYNC_BLOCK(worker->async);
ucs_assert(prog->refcount > 0);
if (--prog->refcount == 0) {
ucs_callbackq_remove(&worker->progress_q, prog->cb, prog->arg);
}
UCS_ASYNC_UNBLOCK(worker->async);
}

void uct_worker_slowpath_progress_register(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem)
static void uct_worker_slowpath_proxy(ucs_callbackq_slow_elem_t *self)
{
ucs_callbackq_add_slow_path(&worker->progress_q, elem);
uct_worker_slowpath_elem_t *elem = ucs_derived_of(self, uct_worker_slowpath_elem_t);
elem->cb(elem->arg);
}

void uct_worker_slowpath_progress_unregister(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem)
void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func,
void *arg, unsigned flags,
uct_worker_cb_id_t *id_p)
{
ucs_callbackq_remove_slow_path(&worker->progress_q, elem);
uct_worker_slowpath_elem_t *elem;

if (*id_p == NULL) {
UCS_ASYNC_BLOCK(worker->async);

elem = ucs_malloc(sizeof(*elem), "uct_worker_slowpath_elem");
ucs_assert_always(elem != NULL);

elem->super.cb = uct_worker_slowpath_proxy;
elem->cb = func;
elem->arg = arg;
ucs_callbackq_add_slow_path(&worker->progress_q, &elem->super);
*id_p = elem;

UCS_ASYNC_UNBLOCK(worker->async);
}
}

void uct_worker_progress_unregister_safe(uct_worker_h worker,
uct_worker_cb_id_t *id_p)
{
uct_worker_slowpath_elem_t *elem;

if (*id_p != NULL) {
UCS_ASYNC_BLOCK(worker->async);
elem = *id_p;
ucs_callbackq_remove_slow_path(&worker->progress_q, &elem->super);
ucs_free(elem);
UCS_ASYNC_UNBLOCK(worker->async);
*id_p = NULL;
}
}
Loading