Skip to content

Commit

Permalink
UCT/API/UCP: Hide slow-path progress elem and refcount in uct_worker.
Browse files Browse the repository at this point in the history
This commit does some preparations in UCT for the change in callback
queue API:
+ Block async context outside of callback queue calls.
+ Keep callback reference count in a handle in uct_base_iface_t/
+ Instead of exposing slow-path callback queue element in UCT API, hide
  it inside uct worker code. This will allocate the element internally
  and release when finished.
  • Loading branch information
yosefe committed Jun 6, 2017
1 parent 78fb0dd commit 58bf758
Show file tree
Hide file tree
Showing 24 changed files with 185 additions and 161 deletions.
34 changes: 14 additions & 20 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,16 +330,13 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
static void ucp_ep_flush_slow_path_remove(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
if (req->send.flush.cbq_elem_on) {
uct_worker_slowpath_progress_unregister(ep->worker->uct,
&req->send.flush.cbq_elem);
req->send.flush.cbq_elem_on = 0;
}
uct_worker_progress_unregister_safe(ep->worker->uct,
&req->send.flush.slow_cb_id);
}

static void ucp_ep_flushed_slow_path_callback(ucs_callbackq_slow_elem_t *self)
static void ucp_ep_flushed_slow_path_callback(void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.flush.cbq_elem);
ucp_request_t *req = arg;
ucp_ep_h ep = req->send.ep;

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));
Expand All @@ -365,16 +362,15 @@ static int ucp_flush_check_completion(ucp_request_t *req)

ucs_trace("adding slow-path callback to destroy ep %p", ep);
ucp_ep_flush_slow_path_remove(req);
req->send.flush.cbq_elem.cb = ucp_ep_flushed_slow_path_callback;
req->send.flush.cbq_elem_on = 1;
uct_worker_slowpath_progress_register(ep->worker->uct,
&req->send.flush.cbq_elem);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flushed_slow_path_callback, req, 0,
&req->send.flush.slow_cb_id);
return 1;
}

static void ucp_ep_flush_resume_slow_path_callback(ucs_callbackq_slow_elem_t *self)
static void ucp_ep_flush_resume_slow_path_callback(void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.flush.cbq_elem);
ucp_request_t *req = arg;

ucp_ep_flush_slow_path_remove(req);
ucp_ep_flush_progress(req);
Expand Down Expand Up @@ -405,12 +401,11 @@ static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
completed = ucp_flush_check_completion(req);

/* If the operation has not completed, add slow-path progress to resume */
if (!completed && req->send.flush.lanes && !req->send.flush.cbq_elem_on) {
if (!completed && req->send.flush.lanes) {
ucs_trace("ep %p: adding slow-path callback to resume flush", ep);
req->send.flush.cbq_elem.cb = ucp_ep_flush_resume_slow_path_callback;
req->send.flush.cbq_elem_on = 1;
uct_worker_slowpath_progress_register(ep->worker->uct,
&req->send.flush.cbq_elem);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flush_resume_slow_path_callback,
req, 0, &req->send.flush.slow_cb_id);
}

if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
Expand Down Expand Up @@ -520,8 +515,7 @@ static ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep)
req->send.ep = ep;
req->send.flush.flushed_cb = ucp_ep_flushed_callback;
req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep));
req->send.flush.cbq_elem.cb = ucp_ep_flushed_slow_path_callback;
req->send.flush.cbq_elem_on = 0;
req->send.flush.slow_cb_id = NULL;
req->send.lane = UCP_NULL_LANE;
req->send.uct.func = ucp_ep_flush_progress_pending;
req->send.uct_comp.func = ucp_ep_flush_completion;
Expand Down
7 changes: 4 additions & 3 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,22 @@ struct ucp_request {

struct {
ucp_request_callback_t flushed_cb;/* Called when flushed */
ucs_callbackq_slow_elem_t cbq_elem; /* Slow-path callback */
uint8_t cbq_elem_on;
uct_worker_cb_id_t slow_cb_id;/* Slow-path callback */
ucp_lane_map_t lanes; /* Which lanes need to be flushed */
} flush;

struct {
uint64_t remote_addr; /* Remote address */
ucp_atomic_fetch_op_t op; /* Requested AMO */
ucp_rkey_h rkey; /* Remote memory key */
uint64_t value;
void *result;
} amo;

struct {
void *rndv_op; /* Handler of issued rndv send. Needs to cancel the
operation if it is completed by SW */
} tag_offload;
} tag_offload;

};

Expand Down
16 changes: 7 additions & 9 deletions src/ucp/wireup/stub_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ static ucs_status_t ucp_stub_ep_connect_to_ep(uct_ep_h uct_ep,
* We switch the endpoint in this function (instead in wireup code) since
* this is guaranteed to run from the main thread.
*/
static void ucp_stub_ep_progress(ucs_callbackq_slow_elem_t *elem)
static void ucp_stub_ep_progress(void *arg)
{
ucp_stub_ep_t *stub_ep = ucs_container_of(elem, ucp_stub_ep_t, elem);
ucp_stub_ep_t *stub_ep = arg;
ucp_ep_h ep = stub_ep->ep;
ucs_queue_head_t tmp_pending_queue;
uct_pending_req_t *uct_req;
Expand Down Expand Up @@ -313,7 +313,7 @@ UCS_CLASS_INIT_FUNC(ucp_stub_ep_t, ucp_ep_h ep)
self->aux_rsc_index = UCP_NULL_RESOURCE;
self->pending_count = 0;
self->flags = 0;
self->elem.cb = ucp_stub_ep_progress;
self->progress_id = NULL;
ucs_queue_head_init(&self->pending_q);
ucs_trace("ep %p: created stub ep %p to %s ", ep, self, ucp_ep_peer_name(ep));
return UCS_OK;
Expand All @@ -326,10 +326,7 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_stub_ep_t)

ucs_debug("ep %p: destroy stub ep %p", self->ep, self);

if (self->flags & UCP_STUB_EP_FLAG_READY) {
uct_worker_slowpath_progress_unregister(self->ep->worker->uct,
&self->elem);
}
uct_worker_progress_unregister_safe(self->ep->worker->uct, &self->progress_id);
if (self->aux_ep != NULL) {
uct_ep_destroy(self->aux_ep);
}
Expand Down Expand Up @@ -409,8 +406,9 @@ void ucp_stub_ep_remote_connected(uct_ep_h uct_ep)

ucs_trace("ep %p: stub ep %p is remote-connected", stub_ep->ep, stub_ep);
stub_ep->flags |= UCP_STUB_EP_FLAG_READY;
uct_worker_slowpath_progress_register(stub_ep->ep->worker->uct,
&stub_ep->elem);
uct_worker_progress_register_safe(stub_ep->ep->worker->uct,
ucp_stub_ep_progress, stub_ep, 0,
&stub_ep->progress_id);
}

int ucp_stub_ep_test(uct_ep_h uct_ep)
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/wireup/stub_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct ucp_stub_ep {
ucp_rsc_index_t aux_rsc_index; /**< Index of auxiliary transport */
volatile uint32_t pending_count; /**< Number of pending wireup operations */
volatile uint32_t flags; /**< Connection state flags */
ucs_callbackq_slow_elem_t elem; /**< Slow-path callback */
uct_worker_cb_id_t progress_id; /**< ID of progress function */
};


Expand Down
1 change: 1 addition & 0 deletions src/ucs/async/signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <ucs/datastruct/list.h>
#include <ucs/type/status.h>
#include <ucs/sys/sys.h> /* for ucs_get_tid() */
#include <pthread.h>


Expand Down
62 changes: 13 additions & 49 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -723,42 +723,6 @@ void uct_worker_destroy(uct_worker_h worker);
void uct_worker_progress(uct_worker_h worker);


/**
* @ingroup UCT_CONTEXT
* @brief Add a callback function to a worker progress.
*
* Add a function which will be called every time a progress is made on the worker.
*
* @param [in] worker Handle to worker.
* @param [in] func Pointer to callback function.
* @param [in] arg Argument to the function.
*
* @note If the same function and argument are already on the list, their reference
* count will be incremented.
* @note This operation could potentially be slow.
*/
void uct_worker_progress_register(uct_worker_h worker,
ucs_callback_t func, void *arg);


/**
* @ingroup UCT_CONTEXT
* @brief Remove a callback function from worker's progress.
*
* Remove a previously added function from worker's progress.
*
* @param [in] worker Handle to worker.
* @param [in] func Pointer to callback function.
* @param [in] arg Argument to the function.
*
* @note If the reference count of the function is >1, it will be decremented and
* the function will not be removed.
* @note This operation could potentially be slow.
*/
void uct_worker_progress_unregister(uct_worker_h worker,
ucs_callback_t func, void *arg);


/**
* @ingroup UCT_CONTEXT
* @brief Add a slow path callback function to a worker progress.
Expand All @@ -768,13 +732,15 @@ void uct_worker_progress_unregister(uct_worker_h worker,
* element is allocated by the caller, but the overhead of calling this function
* is slightly higher than @ref uct_worker_progress_register.
*
* @param [in] worker Handle to worker.
* @param [in] elem Callback function to add, with it's associated context.
*
* @note This operation could potentially be slow.
* @param [in] worker Handle to worker.
* @param [in] func Pointer to callback function.
* @param [in] arg Argument to the function.
* @param [in] flags Callback flags, see @ref ucs_callbackq_flags
* @param [inout] id_p Filled with callback ID.
*/
void uct_worker_slowpath_progress_register(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem);
void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func,
void *arg, unsigned flags,
uct_worker_cb_id_t *id_p);


/**
Expand All @@ -783,14 +749,12 @@ void uct_worker_slowpath_progress_register(uct_worker_h worker,
*
* Remove a function previously added by @ref uct_worker_slowpath_progress_register.
*
* @param [in] worker Handle to worker.
* @param [in] elem Callback element to remove. Must be the same pointer
* added earlier.
*
* @note This operation could potentially be slow.
* @param [in] worker Handle to worker.
* @param [inout] id_p Callback ID to remove, after a call to this
* function it's set to UCS_CALLBACKQ_ID_NULL.
*/
void uct_worker_slowpath_progress_unregister(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem);
void uct_worker_progress_unregister_safe(uct_worker_h worker,
uct_worker_cb_id_t *id_p);


/**
Expand Down
1 change: 1 addition & 0 deletions src/uct/api/uct_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ typedef struct uct_iface_addr uct_iface_addr_t;
typedef struct uct_ep_addr uct_ep_addr_t;
typedef struct uct_tag_context uct_tag_context_t;
typedef uint64_t uct_tag_t; /* tag type - 64 bit */
typedef void* uct_worker_cb_id_t;
/**
* @}
*/
Expand Down
1 change: 1 addition & 0 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops, uct_md_h md,
self->am_tracer_arg = NULL;
self->err_handler = params->err_handler;
self->err_handler_arg = params->err_handler_arg;
uct_worker_progress_init(&self->prog);

for (id = 0; id < UCT_AM_ID_MAX; ++id) {
uct_iface_set_stub_am_handler(self, id);
Expand Down
3 changes: 3 additions & 0 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#ifndef UCT_IFACE_H_
#define UCT_IFACE_H_

#include "uct_worker.h"

#include <uct/api/uct.h>
#include <ucs/config/parser.h>
#include <ucs/datastruct/mpool.h>
Expand Down Expand Up @@ -178,6 +180,7 @@ typedef struct uct_base_iface {
uct_iface_t super;
uct_md_h md; /* MD this interface is using */
uct_worker_h worker; /* Worker this interface is on */
uct_worker_progress_t prog;
UCS_STATS_NODE_DECLARE(stats); /* Statistics */
uct_am_handler_t am[UCT_AM_ID_MAX]; /* Active message table */
uct_am_tracer_t am_tracer; /* Active message tracer */
Expand Down
80 changes: 68 additions & 12 deletions src/uct/base/uct_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
#include "uct_worker.h"

#include <ucs/type/class.h>
#include <ucs/async/async.h>
#include <ucs/datastruct/callbackq.inl>


typedef struct uct_worker_slowpath_elem {
ucs_callbackq_slow_elem_t super;
ucs_callback_t cb;
void *arg;
} uct_worker_slowpath_elem_t;


static UCS_CLASS_INIT_FUNC(uct_worker_t, ucs_async_context_t *async,
ucs_thread_mode_t thread_mode)
{
self->async = async;
self->thread_mode = thread_mode;
ucs_callbackq_init(&self->progress_q, 64, async);
ucs_callbackq_init(&self->progress_q, 64, NULL);
ucs_list_head_init(&self->tl_data);
return UCS_OK;
}
Expand All @@ -37,27 +45,75 @@ void uct_worker_progress(uct_worker_h worker)
ucs_callbackq_dispatch(&worker->progress_q);
}

void uct_worker_progress_register(uct_worker_h worker,
ucs_callback_t func, void *arg)
void uct_worker_progress_init(uct_worker_progress_t *prog)
{
prog->cb = NULL;
prog->arg = NULL;
prog->refcount = 0;
}

void uct_worker_progress_register(uct_worker_h worker, ucs_callback_t cb,
void *arg, uct_worker_progress_t *prog)
{
ucs_callbackq_add(&worker->progress_q, func, arg);
UCS_ASYNC_BLOCK(worker->async);
if (prog->refcount++ == 0) {
prog->cb = cb;
prog->arg = arg;
ucs_callbackq_add(&worker->progress_q, cb, arg);
}
UCS_ASYNC_UNBLOCK(worker->async);
}

void uct_worker_progress_unregister(uct_worker_h worker,
ucs_callback_t func, void *arg)
uct_worker_progress_t *prog)
{
ucs_callbackq_remove(&worker->progress_q, func, arg);
UCS_ASYNC_BLOCK(worker->async);
ucs_assert(prog->refcount > 0);
if (--prog->refcount == 0) {
ucs_callbackq_remove(&worker->progress_q, prog->cb, prog->arg);
}
UCS_ASYNC_UNBLOCK(worker->async);
}

void uct_worker_slowpath_progress_register(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem)
static void uct_worker_slowpath_proxy(ucs_callbackq_slow_elem_t *self)
{
ucs_callbackq_add_slow_path(&worker->progress_q, elem);
uct_worker_slowpath_elem_t *elem = ucs_derived_of(self, uct_worker_slowpath_elem_t);
elem->cb(elem->arg);
}

void uct_worker_slowpath_progress_unregister(uct_worker_h worker,
ucs_callbackq_slow_elem_t *elem)
void uct_worker_progress_register_safe(uct_worker_h worker, ucs_callback_t func,
void *arg, unsigned flags,
uct_worker_cb_id_t *id_p)
{
ucs_callbackq_remove_slow_path(&worker->progress_q, elem);
uct_worker_slowpath_elem_t *elem;

if (*id_p == NULL) {
UCS_ASYNC_BLOCK(worker->async);

elem = ucs_malloc(sizeof(*elem), "uct_worker_slowpath_elem");
ucs_assert_always(elem != NULL);

elem->super.cb = uct_worker_slowpath_proxy;
elem->cb = func;
elem->arg = arg;
ucs_callbackq_add_slow_path(&worker->progress_q, &elem->super);
*id_p = elem;

UCS_ASYNC_UNBLOCK(worker->async);
}
}

void uct_worker_progress_unregister_safe(uct_worker_h worker,
uct_worker_cb_id_t *id_p)
{
uct_worker_slowpath_elem_t *elem;

if (*id_p != NULL) {
UCS_ASYNC_BLOCK(worker->async);
elem = *id_p;
ucs_callbackq_remove_slow_path(&worker->progress_q, &elem->super);
ucs_free(elem);
UCS_ASYNC_UNBLOCK(worker->async);
*id_p = NULL;
}
}
Loading

0 comments on commit 58bf758

Please sign in to comment.