Skip to content

Commit

Permalink
Merge pull request #1610 from yosefe/topic/uct-iface-organize-ops
Browse files Browse the repository at this point in the history
UCT: Cleanup interface operations definitions.
  • Loading branch information
yosefe authored Jun 20, 2017
2 parents ad76066 + d337101 commit fc0b0a5
Show file tree
Hide file tree
Showing 21 changed files with 449 additions and 419 deletions.
178 changes: 94 additions & 84 deletions src/uct/api/tl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,13 @@


/**
* Transport iface operations.
* Transport interface operations.
* Every operation exposed in the API should appear in the table below, to allow
* creating interface/endpoint with custom operations.
*/
typedef struct uct_iface_ops {

void (*iface_close)(uct_iface_h iface);

ucs_status_t (*iface_query)(uct_iface_h iface,
uct_iface_attr_t *iface_attr);

ucs_status_t (*iface_flush)(uct_iface_h iface, unsigned flags,
uct_completion_t *comp);

ucs_status_t (*iface_fence)(uct_iface_h iface, unsigned flags);

ucs_status_t (*iface_wakeup_open)(uct_iface_h iface, unsigned events,
uct_wakeup_h wakeup);

ucs_status_t (*iface_wakeup_get_fd)(uct_wakeup_h wakeup, int *fd_p);

ucs_status_t (*iface_wakeup_arm)(uct_wakeup_h wakeup);

ucs_status_t (*iface_wakeup_wait)(uct_wakeup_h wakeup);

ucs_status_t (*iface_wakeup_signal)(uct_wakeup_h wakeup);

void (*iface_wakeup_close)(uct_wakeup_h wakeup);

ucs_status_t (*iface_tag_recv_zcopy)(uct_iface_h iface, uct_tag_t tag,
uct_tag_t tag_mask,
const uct_iov_t *iov,
size_t iovcnt,
uct_tag_context_t *ctx);

ucs_status_t (*iface_tag_recv_cancel)(uct_iface_h iface,
uct_tag_context_t *ctx,
int force);


/* Connection establishment */

ucs_status_t (*ep_create)(uct_iface_h iface, uct_ep_h *ep_p);

ucs_status_t (*ep_create_connected)(uct_iface_h iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr,
uct_ep_h* ep_p);

void (*ep_destroy)(uct_ep_h ep);

ucs_status_t (*ep_get_address)(uct_ep_h ep, uct_ep_addr_t *addr);

ucs_status_t (*ep_connect_to_ep)(uct_ep_h ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr);

ucs_status_t (*iface_get_device_address)(uct_iface_h iface,
uct_device_addr_t *addr);

ucs_status_t (*iface_get_address)(uct_iface_h iface, uct_iface_addr_t *addr);

int (*iface_is_reachable)(const uct_iface_h iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr);

/* Put */
/* endpoint - put */

ucs_status_t (*ep_put_short)(uct_ep_h ep, const void *buffer, unsigned length,
uint64_t remote_addr, uct_rkey_t rkey);
Expand All @@ -95,7 +37,7 @@ typedef struct uct_iface_ops {
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp);

/* Get */
/* endpoint - get */

ucs_status_t (*ep_get_bcopy)(uct_ep_h ep, uct_unpack_callback_t unpack_cb,
void *arg, size_t length,
Expand All @@ -106,7 +48,7 @@ typedef struct uct_iface_ops {
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp);

/* Active message */
/* endpoint - active message */

ucs_status_t (*ep_am_short)(uct_ep_h ep, uint8_t id, uint64_t header,
const void *payload, unsigned length);
Expand All @@ -118,7 +60,7 @@ typedef struct uct_iface_ops {
unsigned header_length, const uct_iov_t *iov,
size_t iovcnt, uct_completion_t *comp);

/* Atomics */
/* endpoint - atomics */

ucs_status_t (*ep_atomic_add64)(uct_ep_h ep, uint64_t add,
uint64_t remote_addr, uct_rkey_t rkey);
Expand Down Expand Up @@ -150,25 +92,7 @@ typedef struct uct_iface_ops {
uint64_t remote_addr, uct_rkey_t rkey,
uint32_t *result, uct_completion_t *comp);

/* Pending queue */

ucs_status_t (*ep_pending_add)(uct_ep_h ep, uct_pending_req_t *n);

void (*ep_pending_purge)(uct_ep_h ep, uct_pending_purge_callback_t cb,
void * arg);

/* TODO purge per iface */

/* Synchronization */

ucs_status_t (*ep_flush)(uct_ep_h ep, unsigned flags,
uct_completion_t *comp);

ucs_status_t (*ep_fence)(uct_ep_h ep, unsigned flags);

ucs_status_t (*ep_check)(uct_ep_h ep, unsigned flags, uct_completion_t *comp);

/* Tagged operations */
/* endpoint - tagged operations */

ucs_status_t (*ep_tag_eager_short)(uct_ep_h ep, uct_tag_t tag,
const void *data, size_t length);
Expand All @@ -192,6 +116,92 @@ typedef struct uct_iface_ops {
ucs_status_t (*ep_tag_rndv_request)(uct_ep_h ep, uct_tag_t tag,
const void* header,
unsigned header_length);

/* interface - tagged operations */

ucs_status_t (*iface_tag_recv_zcopy)(uct_iface_h iface, uct_tag_t tag,
uct_tag_t tag_mask,
const uct_iov_t *iov,
size_t iovcnt,
uct_tag_context_t *ctx);

ucs_status_t (*iface_tag_recv_cancel)(uct_iface_h iface,
uct_tag_context_t *ctx,
int force);

/* endpoint - pending queue */

ucs_status_t (*ep_pending_add)(uct_ep_h ep, uct_pending_req_t *n);

void (*ep_pending_purge)(uct_ep_h ep, uct_pending_purge_callback_t cb,
void *arg);

/* endpoint - synchronization */

ucs_status_t (*ep_flush)(uct_ep_h ep, unsigned flags,
uct_completion_t *comp);

ucs_status_t (*ep_fence)(uct_ep_h ep, unsigned flags);

ucs_status_t (*ep_check)(uct_ep_h ep, unsigned flags, uct_completion_t *comp);

/* endpoint - connection establishment */

ucs_status_t (*ep_create)(uct_iface_h iface, uct_ep_h *ep_p);

ucs_status_t (*ep_create_connected)(uct_iface_h iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr,
uct_ep_h* ep_p);

void (*ep_destroy)(uct_ep_h ep);

ucs_status_t (*ep_get_address)(uct_ep_h ep, uct_ep_addr_t *addr);

ucs_status_t (*ep_connect_to_ep)(uct_ep_h ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr);

/* interface - synchronization */

ucs_status_t (*iface_flush)(uct_iface_h iface, unsigned flags,
uct_completion_t *comp);

ucs_status_t (*iface_fence)(uct_iface_h iface, unsigned flags);

/* interface - events and progress */

ucs_status_t (*iface_wakeup_open)(uct_iface_h iface, unsigned events,
uct_wakeup_h wakeup);

ucs_status_t (*iface_wakeup_get_fd)(uct_wakeup_h wakeup, int *fd_p);

ucs_status_t (*iface_wakeup_arm)(uct_wakeup_h wakeup);

ucs_status_t (*iface_wakeup_wait)(uct_wakeup_h wakeup);

ucs_status_t (*iface_wakeup_signal)(uct_wakeup_h wakeup);

void (*iface_wakeup_close)(uct_wakeup_h wakeup);

/* interface - management */

void (*iface_close)(uct_iface_h iface);

ucs_status_t (*iface_query)(uct_iface_h iface,
uct_iface_attr_t *iface_attr);

/* interface - connection establishment */

ucs_status_t (*iface_get_device_address)(uct_iface_h iface,
uct_device_addr_t *addr);

ucs_status_t (*iface_get_address)(uct_iface_h iface, uct_iface_addr_t *addr);

int (*iface_is_reachable)(const uct_iface_h iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr);

} uct_iface_ops_t;


Expand Down
57 changes: 29 additions & 28 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,27 @@ void uct_iface_close(uct_iface_h iface)
iface->ops.iface_close(iface);
}

static ucs_status_t uct_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp)
ucs_status_t uct_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp)
{
UCT_TL_IFACE_STAT_FLUSH(ucs_derived_of(tl_iface, uct_base_iface_t));
return UCS_OK;
}

static ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags)
ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags)
{
UCT_TL_IFACE_STAT_FENCE(ucs_derived_of(tl_iface, uct_base_iface_t));
return UCS_OK;
}

static ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp)
ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp)
{
UCT_TL_EP_STAT_FLUSH(ucs_derived_of(tl_ep, uct_base_ep_t));
return UCS_OK;
}

static ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags)
ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags)
{
UCT_TL_EP_STAT_FENCE(ucs_derived_of(tl_ep, uct_base_ep_t));
return UCS_OK;
Expand Down Expand Up @@ -295,12 +295,6 @@ void uct_set_ep_failed(ucs_class_t *cls, uct_ep_h tl_ep, uct_iface_h tl_iface)
* Failed ep will use that queue for purge. */
uct_ep_pending_purge(tl_ep, uct_ep_failed_purge_cb, &f_iface->pend_q);

ops->ep_get_address = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_connect_to_ep = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_flush = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_destroy = uct_ep_failed_destroy;
ops->ep_pending_add = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_pending_purge = uct_ep_failed_purge;
ops->ep_put_short = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_put_bcopy = (void*)ucs_empty_function_return_bc_ep_timeout;
ops->ep_put_zcopy = (void*)ucs_empty_function_return_ep_timeout;
Expand All @@ -317,6 +311,20 @@ void uct_set_ep_failed(ucs_class_t *cls, uct_ep_h tl_ep, uct_iface_h tl_iface)
ops->ep_atomic_fadd32 = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_atomic_swap32 = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_atomic_cswap32 = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_tag_eager_short = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_tag_eager_bcopy = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_tag_eager_zcopy = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_tag_rndv_zcopy = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_tag_rndv_cancel = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_tag_rndv_request= (void*)ucs_empty_function_return_ep_timeout;
ops->ep_pending_add = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_pending_purge = uct_ep_failed_purge;
ops->ep_flush = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_fence = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_check = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_connect_to_ep = (void*)ucs_empty_function_return_ep_timeout;
ops->ep_destroy = uct_ep_failed_destroy;
ops->ep_get_address = (void*)ucs_empty_function_return_ep_timeout;

ucs_class_call_cleanup_chain(cls, tl_ep, -1);

Expand All @@ -333,24 +341,17 @@ void uct_set_ep_failed(ucs_class_t *cls, uct_ep_h tl_ep, uct_iface_h tl_iface)

UCS_CLASS_INIT_FUNC(uct_iface_t, uct_iface_ops_t *ops)
{
ucs_assert_always(ops->ep_flush != NULL);
ucs_assert_always(ops->ep_fence != NULL);
ucs_assert_always(ops->ep_destroy != NULL);
ucs_assert_always(ops->iface_flush != NULL);
ucs_assert_always(ops->iface_fence != NULL);
ucs_assert_always(ops->iface_close != NULL);
ucs_assert_always(ops->iface_query != NULL);
ucs_assert_always(ops->iface_get_device_address != NULL);
ucs_assert_always(ops->iface_is_reachable != NULL);

self->ops = *ops;
if (ops->ep_flush == NULL) {
self->ops.ep_flush = uct_base_ep_flush;
}

if (ops->ep_fence == NULL) {
self->ops.ep_fence = uct_base_ep_fence;
}

if (ops->iface_flush == NULL) {
self->ops.iface_flush = uct_base_iface_flush;
}

if (ops->iface_fence == NULL) {
self->ops.iface_fence = uct_base_iface_fence;
}

return UCS_OK;
}

Expand Down
34 changes: 22 additions & 12 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,25 +177,25 @@ typedef struct uct_wakeup {
* Includes the AM table which we don't want to expose.
*/
typedef struct uct_base_iface {
uct_iface_t super;
uct_md_h md; /* MD this interface is using */
uct_worker_h worker; /* Worker this interface is on */
uct_worker_progress_t prog;
UCS_STATS_NODE_DECLARE(stats); /* Statistics */
uct_am_handler_t am[UCT_AM_ID_MAX]; /* Active message table */
uct_am_tracer_t am_tracer; /* Active message tracer */
void *am_tracer_arg; /* Tracer argument */
uct_iface_t super;
uct_md_h md; /* MD this interface is using */
uct_worker_h worker; /* Worker this interface is on */
uct_am_handler_t am[UCT_AM_ID_MAX];/* Active message table */
uct_am_tracer_t am_tracer; /* Active message tracer */
void *am_tracer_arg; /* Tracer argument */
uct_error_handler_t err_handler; /* Error handler */
void *err_handler_arg; /* Error handler argument */
uct_worker_progress_t prog;

struct {
unsigned num_alloc_methods;
uct_alloc_method_t alloc_methods[UCT_ALLOC_METHOD_LAST];
ucs_log_level_t failure_level;
} config;

uct_error_handler_t err_handler; /* Error handler */
void *err_handler_arg; /* Error handler argument */

UCS_STATS_NODE_DECLARE(stats); /* Statistics */
} uct_base_iface_t;

UCS_CLASS_DECLARE(uct_base_iface_t, uct_iface_ops_t*, uct_md_h, uct_worker_h,
const uct_iface_params_t*, const uct_iface_config_t*
UCS_STATS_ARG(ucs_stats_node_t*) UCS_STATS_ARG(const char*));
Expand Down Expand Up @@ -482,7 +482,17 @@ void uct_iface_mpool_empty_warn(uct_base_iface_t *iface, ucs_mpool_t *mp);

void uct_set_ep_failed(ucs_class_t* cls, uct_ep_h tl_ep, uct_iface_h tl_iface);

/**
ucs_status_t uct_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp);

ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags);

ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp);

ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags);

/*
* Invoke active message handler.
*
* @param iface Interface to invoke the handler for.
Expand Down
Loading

0 comments on commit fc0b0a5

Please sign in to comment.