Skip to content

Commit

Permalink
Merge pull request #1933 from evgeny-leksikov/ucp_stream_recv_nb_prep
Browse files Browse the repository at this point in the history
UCP/STREAM: add ucp_stream_worker_poll impl and EP extensions (STREAM)
  • Loading branch information
yosefe authored Nov 10, 2017
2 parents e171fc1 + c61896c commit ac985b7
Show file tree
Hide file tree
Showing 20 changed files with 298 additions and 164 deletions.
1 change: 1 addition & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ noinst_HEADERS = \
core/ucp_thread.h \
core/ucp_types.h \
dt/dt.h \
dt/dt.inl \
dt/dt_contig.h \
dt/dt_iov.h \
dt/dt_generic.h \
Expand Down
46 changes: 38 additions & 8 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ucp/wireup/wireup.h>
#include <ucp/tag/eager.h>
#include <ucp/tag/offload.h>
#include <ucs/datastruct/queue.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/sys/string.h>
Expand Down Expand Up @@ -72,7 +73,25 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
ep->cfg_index = ucp_worker_get_ep_config(worker, &key);
ep->am_lane = UCP_NULL_LANE;
ep->flags = 0;
ucs_queue_head_init(&ep->stream_data);

if (worker->context->config.features & UCP_FEATURE_STREAM) {
ep->ext.stream = ucs_calloc(1, sizeof(*ep->ext.stream),
"ucp ep stream extension");
if (ep->ext.stream == NULL) {
ucs_error("Failed to allocate ucp ep stream extension");
status = UCS_ERR_NO_MEMORY;
goto err_free_ep;
}

ucs_queue_head_init(&ep->ext.stream->data);
ucs_queue_head_init(&ep->ext.stream->reqs);
ep->ext.stream->ucp_ep = ep;
ep->ext.stream->rdesc = NULL;
ep->ext.stream->rdesc_offset = 0;
ep->ext.stream->rdesc_len = 0;
} else {
ep->ext.stream = NULL;
}

#if ENABLE_DEBUG_DATA
ucs_snprintf_zero(ep->peer_name, UCP_WORKER_NAME_MAX, "%s", peer_name);
Expand All @@ -82,7 +101,7 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
status = UCS_STATS_NODE_ALLOC(&ep->stats, &ucp_ep_stats_class,
worker->stats, "-%p", ep);
if (status != UCS_OK) {
goto err_free_ep;
goto err_free_ext_ep;
}

hash_it = kh_put(ucp_worker_ep_hash, &worker->ep_hash, dest_uuid,
Expand All @@ -103,6 +122,8 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,

err_free_stats:
UCS_STATS_NODE_FREE(ep->stats);
err_free_ext_ep:
ucs_free(ep->ext.stream);
err_free_ep:
ucs_free(ep);
err:
Expand Down Expand Up @@ -199,7 +220,7 @@ ucp_ep_adjust_params(ucp_ep_h ep, const ucp_ep_params_t *params)
{
ucs_status_t status = UCS_OK;

/* TODO: handle a case where the existing endpoint is incomplete */
/* handle a case where the existing endpoint is incomplete */

if (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) {
if (ucp_ep_config(ep)->key.err_mode != params->err_mode) {
Expand Down Expand Up @@ -349,17 +370,21 @@ void ucp_ep_destroy_internal(ucp_ep_h ep, const char *message)
}

UCS_STATS_NODE_FREE(ep->stats);
ucs_free(ep->ext.stream);
ucs_free(ep);
}

static void ucp_ep_disconnected(ucp_ep_h ep)
static void ucp_ep_ext_stream_cleanup(ucp_ep_h ep)
{
ucp_recv_desc_t *rdesc;
ucp_ep_ext_stream_t *ep_stream = ep->ext.stream;
ucp_recv_desc_t *rdesc;

ucs_trace("ep %p is disconnected", ep);
if (ep_stream == NULL) {
return;
}

while (!ucs_queue_is_empty(&ep->stream_data)) {
rdesc = ucs_queue_pull_elem_non_empty(&ep->stream_data, ucp_recv_desc_t,
while (!ucs_queue_is_empty(&ep_stream->data)) {
rdesc = ucs_queue_pull_elem_non_empty(&ep_stream->data, ucp_recv_desc_t,
stream_queue);

if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_UCT_DESC)) {
Expand All @@ -369,6 +394,11 @@ static void ucp_ep_disconnected(ucp_ep_h ep)
ucs_mpool_put_inline(rdesc);
}
}
}

static void ucp_ep_disconnected(ucp_ep_h ep)
{
ucp_ep_ext_stream_cleanup(ep);

if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) {
/* Endpoints which have remote connection are destroyed only when the
Expand Down
37 changes: 32 additions & 5 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ enum {
UCP_EP_FLAG_CONNECT_REQ_QUEUED = UCS_BIT(2), /* Connection request was queued */
UCP_EP_FLAG_TAG_OFFLOAD_ENABLED = UCS_BIT(3), /* Endpoint uses tl offload for tag matching */
UCP_EP_FLAG_FAILED = UCS_BIT(4), /* EP is in failed state */
UCP_EP_FLAG_STREAM_IS_QUEUED = UCS_BIT(5), /* EP is queued in stream list of worker */

UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(5), /* DEBUG: Connection request was sent */
UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(6), /* DEBUG: Connection reply was sent */
UCP_EP_FLAG_CONNECT_ACK_SENT = UCS_BIT(7) /* DEBUG: Connection ACK was sent */
/* DEBUG bits */
UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(8), /* DEBUG: Connection request was sent */
UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(9), /* DEBUG: Connection reply was sent */
UCP_EP_FLAG_CONNECT_ACK_SENT = UCS_BIT(10) /* DEBUG: Connection ACK was sent */
};


Expand Down Expand Up @@ -182,6 +184,25 @@ typedef struct ucp_ep_config {
} ucp_ep_config_t;


/**
* UCP_FEATURE_STREAM specific extention of the remote protocol layer endpoint
*/
typedef struct ucp_ep_ext_stream {
/* ep which owns the extension */
ucp_ep_h ucp_ep;
/* List entry in worker's EP list */
ucs_list_link_t list;
/* Queue of receive requests posted on the EP */
ucs_queue_head_t reqs;
/* Queue of receive descriptors with data */
ucs_queue_head_t data;
/* Partially handled desc */
void *rdesc;
size_t rdesc_len;
size_t rdesc_offset;
} ucp_ep_ext_stream_t;


/**
* Remote protocol layer endpoint
*/
Expand All @@ -190,14 +211,16 @@ typedef struct ucp_ep {

ucp_ep_cfg_index_t cfg_index; /* Configuration index */
ucp_lane_index_t am_lane; /* Cached value */
#if ENABLE_DEBUG_DATA
uint16_t flags; /* Endpoint flags */
#else
uint8_t flags; /* Endpoint flags */
#endif

uint64_t dest_uuid; /* Destination worker uuid */
void *user_data; /* user data associated with
the endpoint */

ucs_queue_head_t stream_data; /* Queue of receive descriptors
with data */
UCS_STATS_NODE_DECLARE(stats);

#if ENABLE_DEBUG_DATA
Expand All @@ -207,6 +230,10 @@ typedef struct ucp_ep {
/* TODO allocate ep dynamically according to number of lanes */
uct_ep_h uct_eps[UCP_MAX_LANES]; /* Transports for every lane */

/* Feature specific extentions allocated on demand */
struct {
ucp_ep_ext_stream_t *stream; /* UCP_FEATURE_STREAM */
} ext;
} ucp_ep_t;


Expand Down
6 changes: 3 additions & 3 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ucs_status_t ucp_tag_recv_request_test(void *request, ucp_tag_recv_info_t *info)

if (status != UCS_INPROGRESS) {
ucs_assert(req->flags & UCP_REQUEST_FLAG_RECV);
*info = req->recv.info;
*info = req->recv.tag.info;
}

return status;
Expand Down Expand Up @@ -99,7 +99,7 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request),
ucp_tag_exp_remove(&worker->context->tm, req);
/* If tag posted to the transport need to wait its completion */
if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
ucp_request_complete_recv(req, UCS_ERR_CANCELED);
ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED);
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock);
Expand Down Expand Up @@ -291,7 +291,7 @@ ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info)

if (req->flags & UCP_REQUEST_FLAG_COMPLETED) {
if (req->flags & UCP_REQUEST_FLAG_RECV) {
*info = req->recv.info;
*info = req->recv.tag.info;
}
ucs_assert(req->status != UCS_INPROGRESS);
return req->status;
Expand Down
20 changes: 15 additions & 5 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,25 @@ struct ucp_request {
void *buffer; /* Buffer to receive data to */
ucp_datatype_t datatype; /* Receive type */
size_t length; /* Total length, in bytes */
ucp_tag_t tag; /* Expected tag */
ucp_tag_t tag_mask; /* Expected tag mask */
uint64_t sn; /* Tag match sequence */
ucp_tag_recv_callback_t cb; /* Completion callback */
ucp_tag_recv_info_t info; /* Completion info to fill */
ucp_dt_state_t state;
ucp_worker_t *worker;
ucp_mem_desc_t *rdesc;
uct_tag_context_t uct_ctx; /* Transport offload context */

union {
struct {
ucp_tag_t tag; /* Expected tag */
ucp_tag_t tag_mask; /* Expected tag mask */
uint64_t sn; /* Tag match sequence */
ucp_tag_recv_callback_t cb; /* Completion callback */
ucp_tag_recv_info_t info; /* Completion info to fill */
} tag;

struct {
ucp_stream_recv_callback_t cb; /* Completion callback */
size_t count; /* Completion info to fill */
} stream;
};
} recv;

struct {
Expand Down
19 changes: 16 additions & 3 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,31 @@ ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_recv(ucp_request_t *req, ucs_status_t status)
ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status)
{
ucs_trace_req("completing receive request %p (%p) "UCP_REQUEST_FLAGS_FMT
" stag 0x%"PRIx64" len %zu, %s",
req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
req->recv.info.sender_tag, req->recv.info.length,
req->recv.tag.info.sender_tag, req->recv.tag.info.length,
ucs_status_string(status));
UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) {
--req->recv.worker->context->tm.offload.sw_req_count;
}
ucp_request_complete(req, recv.cb, status, &req->recv.info);
ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_stream_recv(ucp_request_t *req, ucs_status_t status)
{
ucs_assert(req->recv.state.offset > 0);
req->recv.stream.count = req->recv.state.offset;
ucs_trace_req("completing stream receive request %p (%p) "
UCP_REQUEST_FLAGS_FMT" count %zu, %s",
req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
req->recv.stream.count, ucs_status_string(status));
UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
ucp_request_complete(req, recv.stream.cb, status, req->recv.stream.count);
}

/*
Expand Down
33 changes: 26 additions & 7 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->ep_config_max = config_count;
worker->ep_config_count = 0;
ucs_list_head_init(&worker->arm_ifaces);
ucs_list_head_init(&worker->stream_eps);

if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) {
worker->user_data = params->user_data;
Expand Down Expand Up @@ -1216,6 +1217,31 @@ unsigned ucp_worker_progress(ucp_worker_h worker)
return count;
}

ssize_t ucp_stream_worker_poll(ucp_worker_h worker,
ucp_stream_poll_ep_t *poll_eps,
size_t max_eps, unsigned flags)
{
ucp_ep_ext_stream_t *ep_stream;
ucp_ep_h ep;
ssize_t count = 0;

UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock);

while ((count < max_eps) && !ucs_list_is_empty(&worker->stream_eps)) {
ep_stream = ucs_list_extract_head(&worker->stream_eps,
ucp_ep_ext_stream_t, list);
ep = ep_stream->ucp_ep;
ep->flags &= ~UCP_EP_FLAG_STREAM_IS_QUEUED;
poll_eps[count].ep = ep;
poll_eps[count].user_data = ep->user_data;
++count;
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock);

return count;
}

ucs_status_t ucp_worker_get_efd(ucp_worker_h worker, int *fd)
{
ucs_status_t status;
Expand All @@ -1231,13 +1257,6 @@ ucs_status_t ucp_worker_get_efd(ucp_worker_h worker, int *fd)
return status;
}

ssize_t ucp_stream_worker_poll(ucp_worker_h worker,
ucp_stream_poll_ep_t *poll_eps, size_t max_eps,
unsigned flags)
{
return UCS_ERR_NOT_IMPLEMENTED;
}

ucs_status_t ucp_worker_arm(ucp_worker_h worker)
{
ucp_worker_iface_t *wiface;
Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ typedef struct ucp_worker {
ucs_list_link_t arm_ifaces; /* List of interfaces to arm */

void *user_data; /* User-defined data */
ucs_list_link_t stream_eps; /* List of EPs with received stream data */
khash_t(ucp_worker_ep_hash) ep_hash; /* Hash table of all endpoints */
khash_t(ucp_ep_errh_hash) ep_errh_hash; /* Hash table of error handlers associated with endpoints */
ucp_worker_iface_t *ifaces; /* Array of interfaces, one for each resource */
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/dt/dt.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "dt.h"

#include <ucp/core/ucp_request.h>
#include <ucs/debug/profile.h>

size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src,
ucp_dt_state_t *state, size_t length)
Expand Down
Loading

0 comments on commit ac985b7

Please sign in to comment.