Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/TAG: Move TM stuff from context to worker #2031

Merged
merged 3 commits into from
Dec 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -938,20 +938,12 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver
goto err_free_resources;
}

/* initialize tag matching */
status = ucp_tag_match_init(&context->tm);
if (status != UCS_OK) {
goto err_rkey_mp_cleanup;
}

ucs_debug("created ucp context %p [%d mds %d tls] features 0x%lx", context,
context->num_mds, context->num_tls, context->config.features);

*context_p = context;
return UCS_OK;

err_rkey_mp_cleanup:
ucs_mpool_cleanup(&context->rkey_mp, 1);
err_free_resources:
ucp_free_resources(context);
err_free_config:
Expand All @@ -964,7 +956,6 @@ ucs_status_t ucp_init_version(unsigned api_major_version, unsigned api_minor_ver

void ucp_cleanup(ucp_context_h context)
{
ucp_tag_match_cleanup(&context->tm);
ucs_mpool_cleanup(&context->rkey_mp, 1);
ucp_free_resources(context);
ucp_free_config(context);
Expand Down Expand Up @@ -1046,26 +1037,3 @@ void ucp_context_print_info(ucp_context_h context, FILE *stream)
fprintf(stream, "#\n");
}

void ucp_context_tag_offload_enable(ucp_context_h context)
{
ucp_worker_iface_t *offload_iface;

/* Enable offload, if only one tag offload capable interface is present
* (multiple offload ifaces are not supported yet). */
if (context->config.ext.tm_offload &&
(ucs_queue_length(&context->tm.offload.ifaces) == 1)) {
context->tm.offload.thresh = context->config.ext.tm_thresh;
context->tm.offload.zcopy_thresh = context->config.ext.tm_max_bcopy;

offload_iface = ucs_queue_head_elem_non_empty(&context->tm.offload.ifaces,
ucp_worker_iface_t, queue);
ucp_worker_iface_activate(offload_iface, 0);

ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu",
context->tm.offload.thresh, context->tm.offload.zcopy_thresh);
} else {
context->tm.offload.thresh = SIZE_MAX;
ucs_debug("Disable TM offload, multiple offload ifaces are not supported");
}
}

3 changes: 0 additions & 3 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "ucp_thread.h"

#include <ucp/api/ucp.h>
#include <ucp/tag/tag_match.h>
#include <uct/api/uct.h>
#include <ucs/datastruct/mpool.h>
#include <ucs/datastruct/queue_types.h>
Expand Down Expand Up @@ -129,8 +128,6 @@ typedef struct ucp_context {

ucs_mpool_t rkey_mp; /* Pool for memory keys */

ucp_tag_match_t tm; /* Tag-matching queues and offload info */

struct {

/* Bitmap of features supported by the context */
Expand Down
4 changes: 1 addition & 3 deletions src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request),

if (req->flags & UCP_REQUEST_FLAG_EXPECTED) {
UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock);
UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->context->mt_lock);

ucp_tag_exp_remove(&worker->context->tm, req);
ucp_tag_exp_remove(&worker->tm, req);
/* If tag posted to the transport need to wait its completion */
if (!(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) {
ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED);
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->context->mt_lock);
UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status)
ucs_status_string(status));
UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
if (req->flags & UCP_REQUEST_FLAG_BLOCK_OFFLOAD) {
--req->recv.worker->context->tm.offload.sw_req_count;
--req->recv.worker->tm.offload.sw_req_count;
}
ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info);
}
Expand Down
42 changes: 38 additions & 4 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,30 @@ void ucp_worker_iface_event(int fd, void *arg)
ucp_worker_signal_internal(worker);
}

static void ucp_worker_tag_offload_enable(ucp_worker_t *worker)
{
ucp_context_t *context = worker->context;
ucp_worker_iface_t *offload_iface;

/* Enable offload, if only one tag offload capable interface is present
* (multiple offload ifaces are not supported yet). */
if (context->config.ext.tm_offload &&
(ucs_queue_length(&worker->tm.offload.ifaces) == 1)) {
worker->tm.offload.thresh = context->config.ext.tm_thresh;
worker->tm.offload.zcopy_thresh = context->config.ext.tm_max_bcopy;

offload_iface = ucs_queue_head_elem_non_empty(&worker->tm.offload.ifaces,
ucp_worker_iface_t, queue);
ucp_worker_iface_activate(offload_iface, 0);

ucs_debug("Enable TM offload: thresh %zu, zcopy_thresh %zu",
worker->tm.offload.thresh, worker->tm.offload.zcopy_thresh);
} else {
worker->tm.offload.thresh = SIZE_MAX;
ucs_debug("Disable TM offload, multiple offload ifaces are not supported");
}
}

static void ucp_worker_close_ifaces(ucp_worker_h worker)
{
ucp_rsc_index_t rsc_index;
Expand Down Expand Up @@ -643,8 +667,8 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker)
}

if (ucp_worker_is_tl_tag_offload(worker, rsc_index)) {
ucs_queue_remove(&worker->context->tm.offload.ifaces, &wiface->queue);
ucp_context_tag_offload_enable(worker->context);
ucs_queue_remove(&worker->tm.offload.ifaces, &wiface->queue);
ucp_worker_tag_offload_enable(worker);
}

uct_iface_close(wiface->iface);
Expand Down Expand Up @@ -753,8 +777,8 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id,

if (ucp_worker_is_tl_tag_offload(worker, tl_id)) {
worker->ifaces[tl_id].rsc_index = tl_id;
ucs_queue_push(&context->tm.offload.ifaces, &wiface->queue);
ucp_context_tag_offload_enable(context);
ucs_queue_push(&worker->tm.offload.ifaces, &wiface->queue);
ucp_worker_tag_offload_enable(worker);
}

return UCS_OK;
Expand Down Expand Up @@ -1123,6 +1147,13 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
cpu_mask = &empty_cpu_mask;
}

/* Initialize tag matching */
status = ucp_tag_match_init(&worker->tm);
if (status != UCS_OK) {
goto err_wakeup_cleanup;
}


/* Open all resources as interfaces on this worker */
for (tl_id = 0; tl_id < context->num_tls; ++tl_id) {
status = ucp_worker_add_iface(worker, tl_id, rx_headroom, cpu_mask);
Expand All @@ -1145,6 +1176,8 @@ ucs_status_t ucp_worker_create(ucp_context_h context,

err_close_ifaces:
ucp_worker_close_ifaces(worker);
ucp_tag_match_cleanup(&worker->tm);
err_wakeup_cleanup:
ucp_worker_wakeup_cleanup(worker);
err_rndv_lanes_mp_cleanup:
ucs_mpool_cleanup(&worker->rndv_get_mp, 1);
Expand Down Expand Up @@ -1184,6 +1217,7 @@ void ucp_worker_destroy(ucp_worker_h worker)
ucs_mpool_cleanup(&worker->reg_mp, 1);
ucs_mpool_cleanup(&worker->rndv_get_mp, 1);
ucp_worker_close_ifaces(worker);
ucp_tag_match_cleanup(&worker->tm);
ucp_worker_wakeup_cleanup(worker);
ucs_mpool_cleanup(&worker->req_mp, 1);
uct_worker_destroy(worker->uct);
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ucp_ep.h"
#include "ucp_thread.h"

#include <ucp/tag/tag_match.h>
#include <ucs/datastruct/mpool.h>
#include <ucs/datastruct/khash.h>
#include <ucs/datastruct/queue_types.h>
Expand Down Expand Up @@ -172,6 +173,7 @@ typedef struct ucp_worker {
ucs_mpool_t am_mp; /* Memory pool for AM receives */
ucs_mpool_t reg_mp; /* Registered memory pool */
ucp_mt_lock_t mt_lock; /* Configuration of multi-threading support */
ucp_tag_match_t tm; /* Tag-matching queues and offload info */

UCS_STATS_NODE_DECLARE(stats);
UCS_STATS_NODE_DECLARE(tm_offload_stats);
Expand Down
12 changes: 4 additions & 8 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,16 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,
ucp_worker_h worker = arg;
ucp_eager_hdr_t *eager_hdr = data;
ucp_eager_first_hdr_t *eager_first_hdr = data;
ucp_context_h context = worker->context;
ucp_request_t *req;
ucs_status_t status;
size_t recv_len;
ucp_tag_t recv_tag;

UCP_THREAD_CS_ENTER_CONDITIONAL(&context->mt_lock);

ucs_assert(length >= hdr_len);
recv_tag = eager_hdr->super.tag;
recv_len = length - hdr_len;

req = ucp_tag_exp_search(&context->tm, recv_tag, recv_len, flags);
req = ucp_tag_exp_search(&worker->tm, recv_tag, recv_len, flags);
if (req != NULL) {
UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len);

Expand All @@ -85,7 +82,7 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,
* because it arrived either:
* 1) via SW TM (e. g. peer doesn't support offload)
* 2) as unexpected via HW TM */
ucp_tag_offload_try_cancel(context, req, 1);
ucp_tag_offload_try_cancel(worker, req, 1);

if (flags & UCP_RECV_DESC_FLAG_LAST) {
req->recv.tag.info.length = recv_len;
Expand All @@ -111,11 +108,10 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,

status = UCS_OK;
} else {
status = ucp_tag_unexp_recv(&context->tm, worker, data, length, am_flags,
status = ucp_tag_unexp_recv(&worker->tm, worker, data, length, am_flags,
hdr_len, flags);
}

UCP_THREAD_CS_EXIT_CONDITIONAL(&context->mt_lock);
return status;
}

Expand Down Expand Up @@ -182,7 +178,7 @@ static ucs_status_t ucp_eager_offload_sync_ack_handler(void *arg, void *data,
{
ucp_offload_ssend_hdr_t *rep_hdr = data;
ucp_worker_t *worker = arg;
ucs_queue_head_t *queue = &worker->context->tm.offload.sync_reqs;
ucs_queue_head_t *queue = &worker->tm.offload.sync_reqs;
ucp_request_t *sreq;
ucs_queue_iter_t iter;

Expand Down
Loading