Skip to content

Commit

Permalink
Merge pull request #2938 from evgeny-leksikov/uct_am_cb_refact
Browse files Browse the repository at this point in the history
UCT/API: remove redundant UCT_CB_FLAG_SYNC
  • Loading branch information
yosefe authored Oct 17, 2018
2 parents 6be0458 + a86036f commit a90c731
Show file tree
Hide file tree
Showing 28 changed files with 112 additions and 102 deletions.
2 changes: 1 addition & 1 deletion src/tools/perf/libperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ static void uct_perf_test_cleanup_endpoints(ucx_perf_context_t *perf)

uct_perf_barrier(perf);

uct_iface_set_am_handler(perf->uct.iface, UCT_PERF_TEST_AM_ID, NULL, NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(perf->uct.iface, UCT_PERF_TEST_AM_ID, NULL, NULL, 0);

group_size = rte_call(perf, group_size);
group_index = rte_call(perf, group_index);
Expand Down
12 changes: 8 additions & 4 deletions src/tools/perf/uct_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@ class uct_perf_test_runner {
uct_iface_attr_t attr;
status = uct_iface_query(m_perf.uct.iface, &attr);
ucs_assert_always(status == UCS_OK);
if (attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT|UCT_IFACE_FLAG_AM_BCOPY|UCT_IFACE_FLAG_AM_ZCOPY)) {
status = uct_iface_set_am_handler(m_perf.uct.iface, UCT_PERF_TEST_AM_ID,
am_hander, m_perf.recv_buffer, UCT_CB_FLAG_SYNC);
if (attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT |
UCT_IFACE_FLAG_AM_BCOPY |
UCT_IFACE_FLAG_AM_ZCOPY)) {
status = uct_iface_set_am_handler(m_perf.uct.iface,
UCT_PERF_TEST_AM_ID, am_hander,
m_perf.recv_buffer, 0);
ucs_assert_always(status == UCS_OK);
}
}

~uct_perf_test_runner() {
uct_iface_set_am_handler(m_perf.uct.iface, UCT_PERF_TEST_AM_ID, NULL, NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_perf.uct.iface, UCT_PERF_TEST_AM_ID, NULL,
NULL, 0);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ static void ucp_worker_set_am_handlers(ucp_worker_iface_t *wiface, int is_proxy)
continue;
}

if ((ucp_am_handlers[am_id].flags & UCT_CB_FLAG_SYNC) &&
if (!(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_ASYNC) &&
!(wiface->attr.cap.flags & UCT_IFACE_FLAG_CB_SYNC))
{
/* Do not register a sync callback on interface which does not
Expand All @@ -147,7 +147,7 @@ static void ucp_worker_set_am_handlers(ucp_worker_iface_t *wiface, int is_proxy)
/* we care only about sync active messages, and this also makes sure
* the counter is not accessed from another thread.
*/
ucs_assert(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_SYNC);
ucs_assert(!(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_ASYNC));
status = uct_iface_set_am_handler(wiface->iface, am_id,
ucp_am_handlers[am_id].proxy_cb,
wiface,
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ static void ucp_amo_sw_dump_packet(ucp_worker_h worker, uct_am_trace_type_t type
}

UCP_DEFINE_AM(UCP_FEATURE_AMO, UCP_AM_ID_ATOMIC_REQ, ucp_atomic_req_handler,
ucp_amo_sw_dump_packet, UCT_CB_FLAG_SYNC);
ucp_amo_sw_dump_packet, 0);
UCP_DEFINE_AM(UCP_FEATURE_AMO, UCP_AM_ID_ATOMIC_REP, ucp_atomic_rep_handler,
ucp_amo_sw_dump_packet, UCT_CB_FLAG_SYNC);
ucp_amo_sw_dump_packet, 0);

UCP_DEFINE_AM_PROXY(UCP_AM_ID_ATOMIC_REQ);
8 changes: 4 additions & 4 deletions src/ucp/rma/rma_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ static void ucp_rma_sw_dump_packet(ucp_worker_h worker, uct_am_trace_type_t type
}

UCP_DEFINE_AM(UCP_FEATURE_RMA, UCP_AM_ID_PUT, ucp_put_handler,
ucp_rma_sw_dump_packet, UCT_CB_FLAG_SYNC);
ucp_rma_sw_dump_packet, 0);
UCP_DEFINE_AM(UCP_FEATURE_RMA, UCP_AM_ID_GET_REQ, ucp_get_req_handler,
ucp_rma_sw_dump_packet, UCT_CB_FLAG_SYNC);
ucp_rma_sw_dump_packet, 0);
UCP_DEFINE_AM(UCP_FEATURE_RMA, UCP_AM_ID_GET_REP, ucp_get_rep_handler,
ucp_rma_sw_dump_packet, UCT_CB_FLAG_SYNC);
ucp_rma_sw_dump_packet, 0);
UCP_DEFINE_AM(UCP_FEATURE_RMA|UCP_FEATURE_AMO, UCP_AM_ID_CMPL,
ucp_rma_cmpl_handler, ucp_rma_sw_dump_packet, UCT_CB_FLAG_SYNC);
ucp_rma_cmpl_handler, ucp_rma_sw_dump_packet, 0);

UCP_DEFINE_AM_PROXY(UCP_AM_ID_PUT);
UCP_DEFINE_AM_PROXY(UCP_AM_ID_GET_REQ);
5 changes: 2 additions & 3 deletions src/ucp/stream/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,7 @@ static void ucp_stream_am_dump(ucp_worker_h worker, uct_am_trace_type_t type,
length - hdr_len);
}

UCP_DEFINE_AM(UCP_FEATURE_STREAM, UCP_AM_ID_STREAM_DATA,
ucp_stream_am_handler, ucp_stream_am_dump,
UCT_CB_FLAG_SYNC);
UCP_DEFINE_AM(UCP_FEATURE_STREAM, UCP_AM_ID_STREAM_DATA, ucp_stream_am_handler,
ucp_stream_am_dump, 0);

UCP_DEFINE_AM_PROXY(UCP_AM_ID_STREAM_DATA);
20 changes: 10 additions & 10 deletions src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,19 @@ static void ucp_eager_dump(ucp_worker_h worker, uct_am_trace_type_t type,
}

UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_ONLY, ucp_eager_only_handler,
ucp_eager_dump, UCT_CB_FLAG_SYNC);
ucp_eager_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_FIRST, ucp_eager_first_handler,
ucp_eager_dump, UCT_CB_FLAG_SYNC);
ucp_eager_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_MIDDLE, ucp_eager_middle_handler,
ucp_eager_dump, UCT_CB_FLAG_SYNC);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_SYNC_ONLY, ucp_eager_sync_only_handler,
ucp_eager_dump, UCT_CB_FLAG_SYNC);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_SYNC_FIRST, ucp_eager_sync_first_handler,
ucp_eager_dump, UCT_CB_FLAG_SYNC);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_SYNC_ACK, ucp_eager_sync_ack_handler,
ucp_eager_dump, UCT_CB_FLAG_SYNC);
ucp_eager_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_SYNC_ONLY,
ucp_eager_sync_only_handler, ucp_eager_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_SYNC_FIRST,
ucp_eager_sync_first_handler, ucp_eager_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_EAGER_SYNC_ACK,
ucp_eager_sync_ack_handler, ucp_eager_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_OFFLOAD_SYNC_ACK,
ucp_eager_offload_sync_ack_handler, ucp_eager_dump, UCT_CB_FLAG_SYNC);
ucp_eager_offload_sync_ack_handler, ucp_eager_dump, 0);

UCP_DEFINE_AM_PROXY(UCP_AM_ID_EAGER_ONLY);
UCP_DEFINE_AM_PROXY(UCP_AM_ID_EAGER_FIRST);
Expand Down
10 changes: 5 additions & 5 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1042,15 +1042,15 @@ static void ucp_rndv_dump(ucp_worker_h worker, uct_am_trace_type_t type,
}

UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTS, ucp_rndv_rts_handler,
ucp_rndv_dump, UCT_CB_FLAG_SYNC);
ucp_rndv_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_ATS, ucp_rndv_ats_handler,
ucp_rndv_dump, UCT_CB_FLAG_SYNC);
ucp_rndv_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_ATP, ucp_rndv_atp_handler,
ucp_rndv_dump, UCT_CB_FLAG_SYNC);
ucp_rndv_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_RTR, ucp_rndv_rtr_handler,
ucp_rndv_dump, UCT_CB_FLAG_SYNC);
ucp_rndv_dump, 0);
UCP_DEFINE_AM(UCP_FEATURE_TAG, UCP_AM_ID_RNDV_DATA, ucp_rndv_data_handler,
ucp_rndv_dump, UCT_CB_FLAG_SYNC);
ucp_rndv_dump, 0);

UCP_DEFINE_AM_PROXY(UCP_AM_ID_RNDV_RTS);
UCP_DEFINE_AM_PROXY(UCP_AM_ID_RNDV_ATS);
Expand Down
29 changes: 15 additions & 14 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,23 @@ enum uct_msg_flags {
* @brief Callback flags.
*
* List of flags for a callback.
* A callback must have either the SYNC or ASYNC flag set.
*/
enum uct_cb_flags {
UCT_CB_FLAG_SYNC = UCS_BIT(1), /**< Callback is always invoked from the context (thread, process)
that called uct_iface_progress(). An interface must
have the @ref UCT_IFACE_FLAG_CB_SYNC flag set to support sync
callback invocation. */

UCT_CB_FLAG_ASYNC = UCS_BIT(2) /**< Callback may be invoked from any context. For example,
it may be called from a transport async progress thread. To guarantee
async invocation, the interface must have the @ref UCT_IFACE_FLAG_CB_ASYNC
flag set.
If async callback is requested on an interface
which only supports sync callback
(i.e., only the @ref UCT_IFACE_FLAG_CB_SYNC flag is set),
it will behave exactly like a sync callback. */
UCT_CB_FLAG_RESERVED = UCS_BIT(1), /**< Reserved for future use. */
UCT_CB_FLAG_ASYNC = UCS_BIT(2) /**< Callback may be invoked from any
context (thread, process). For
example, it may be called from a
transport async progress thread. To
guarantee async invocation, the
interface must have the @ref
UCT_IFACE_FLAG_CB_ASYNC flag set. If
async callback is requested on an
interface which only supports sync
callback (i.e., only the @ref
UCT_IFACE_FLAG_CB_SYNC flag is set),
the callback may be invoked only
from the context that called @ref
uct_iface_progress). */
};


Expand Down
11 changes: 5 additions & 6 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,17 @@ ucs_status_t uct_iface_set_am_handler(uct_iface_h tl_iface, uint8_t id,
return UCS_OK;
}

if (!(flags & (UCT_CB_FLAG_SYNC|UCT_CB_FLAG_ASYNC))) {
ucs_error("invalid active message flags 0x%x", flags);
return UCS_ERR_INVALID_PARAM;
}

status = uct_iface_query(tl_iface, &attr);
if (status != UCS_OK) {
return status;
}

UCT_CB_FLAGS_CHECK(flags);

/* If user wants a synchronous callback, it must be supported, or the
* callback could be called from another thread.
*/
if ((flags & UCT_CB_FLAG_SYNC) && !(attr.cap.flags & UCT_IFACE_FLAG_CB_SYNC)) {
if (!(flags & UCT_CB_FLAG_ASYNC) && !(attr.cap.flags & UCT_IFACE_FLAG_CB_SYNC)) {
ucs_error("Synchronous callback requested, but not supported");
return UCS_ERR_INVALID_PARAM;
}
Expand Down Expand Up @@ -412,6 +409,8 @@ UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops, uct_md_h md,

UCS_CLASS_CALL_SUPER_INIT(uct_iface_t, ops);

UCT_CB_FLAGS_CHECK(params->err_handler_flags);

self->md = md;
self->worker = ucs_derived_of(worker, uct_priv_worker_t);
self->am_tracer = NULL;
Expand Down
9 changes: 9 additions & 0 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ enum {
UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_TX_NO_DESC, 1);


#define UCT_CB_FLAGS_CHECK(_flags) \
do { \
if ((_flags) & UCT_CB_FLAG_RESERVED) { \
ucs_error("Unsupported callback flag 0x%x", UCT_CB_FLAG_RESERVED); \
return UCS_ERR_INVALID_PARAM; \
} \
} while (0)


/**
* In release mode - do nothing.
*
Expand Down
25 changes: 16 additions & 9 deletions src/uct/ib/rdmacm/rdmacm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@

#include "rdmacm_ep.h"


#define UCT_RDMACM_CB_FLAGS_CHECK(_flags) \
do { \
UCT_CB_FLAGS_CHECK(_flags); \
if (!((_flags) & UCT_CB_FLAG_ASYNC)) { \
return UCS_ERR_UNSUPPORTED; \
} \
} while (0)


ucs_status_t uct_rdmacm_ep_resolve_addr(uct_rdmacm_ep_t *ep)
{
uct_rdmacm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rdmacm_iface_t);
Expand Down Expand Up @@ -88,9 +98,7 @@ static UCS_CLASS_INIT_FUNC(uct_rdmacm_ep_t, uct_iface_t *tl_iface,
return UCS_ERR_UNSUPPORTED;
}

if (cb_flags != UCT_CB_FLAG_ASYNC) {
return UCS_ERR_UNSUPPORTED;
}
UCT_RDMACM_CB_FLAGS_CHECK(cb_flags);

/* Initialize these fields before calling rdma_resolve_addr to avoid a race
* where they are used before being initialized (from the async thread
Expand Down Expand Up @@ -216,17 +224,16 @@ void uct_rdmacm_ep_set_failed(uct_iface_t *iface, uct_ep_h ep, ucs_status_t stat
uct_rdmacm_iface_t *rdmacm_iface = ucs_derived_of(iface, uct_rdmacm_iface_t);
uct_rdmacm_ep_t *rdmacm_ep = ucs_derived_of(ep, uct_rdmacm_ep_t);

if (rdmacm_iface->super.err_handler_flags == UCT_CB_FLAG_SYNC) {
rdmacm_ep->status = status;

if (rdmacm_iface->super.err_handler_flags & UCT_CB_FLAG_ASYNC) {
uct_set_ep_failed(&UCS_CLASS_NAME(uct_rdmacm_ep_t), &rdmacm_ep->super.super,
&rdmacm_iface->super.super, status);
} else {
/* invoke the error handling flow from the main thread */
rdmacm_ep->status = status;
uct_worker_progress_register_safe(&rdmacm_iface->super.worker->super,
uct_rdmacm_client_err_handle_progress,
rdmacm_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
&rdmacm_ep->slow_prog_id);
} else {
uct_set_ep_failed(&UCS_CLASS_NAME(uct_rdmacm_ep_t), &rdmacm_ep->super.super,
&rdmacm_iface->super.super, status);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/uct/ib/rdmacm/rdmacm_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ static UCS_CLASS_INIT_FUNC(uct_rdmacm_iface_t, uct_md_h md, uct_worker_h worker,
ip_port_str, UCS_SOCKADDR_STRING_LEN),
ntohs(rdma_get_src_port(self->cm_id)));

if (params->mode.sockaddr.cb_flags != UCT_CB_FLAG_ASYNC) {
ucs_fatal("UCT_CB_FLAG_SYNC is not supported");
if (!(params->mode.sockaddr.cb_flags & UCT_CB_FLAG_ASYNC)) {
ucs_fatal("Synchronous callback is not supported");
}

self->cb_flags = params->mode.sockaddr.cb_flags;
Expand Down
2 changes: 1 addition & 1 deletion src/uct/ib/ud/base/ud_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b
}

if (ucs_unlikely(is_async &&
(iface->super.super.am[am_id].flags & UCT_CB_FLAG_SYNC))) {
!(iface->super.super.am[am_id].flags & UCT_CB_FLAG_ASYNC))) {
skb->u.am.len = byte_len - sizeof(*neth);
ucs_queue_push(&iface->rx.pending_q, &skb->u.am.queue);
} else {
Expand Down
3 changes: 1 addition & 2 deletions test/examples/uct_hello_world.c
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ int main(int argc, char **argv)

/*Set active message handler */
status = uct_iface_set_am_handler(if_info.iface, id, hello_world,
&cmd_args.func_am_type,
UCT_CB_FLAG_SYNC);
&cmd_args.func_am_type, 0);
CHKERR_JUMP(UCS_OK != status, "set callback", out_free_ep);

if (cmd_args.server_name) {
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/uct/ib/test_cq_moderation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void test_uct_cq_moderation::run_test(uct_iface_h iface) {
check_caps(UCT_IFACE_FLAG_EVENT_SEND_COMP);
check_caps(UCT_IFACE_FLAG_EVENT_RECV);

uct_iface_set_am_handler(m_receiver->iface(), 0, am_cb, this, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_receiver->iface(), 0, am_cb, this, 0);

connect();

Expand Down
6 changes: 2 additions & 4 deletions test/gtest/uct/ib/test_dc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ class test_dc : public uct_test {
m_e2 = uct_test::create_entity(0);
m_entities.push_back(m_e2);

uct_iface_set_am_handler(m_e1->iface(), 0, am_dummy_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e1->iface(), 0, am_dummy_handler, NULL, 0);
uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler, NULL, 0);
}

static uct_dc_iface_t* dc_iface(entity *e) {
Expand Down
4 changes: 2 additions & 2 deletions test/gtest/uct/ib/test_ib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class test_uct_ib : public uct_test {
recv_buffer->length = 0; /* Initialize length to 0 */

/* set a callback for the uct to invoke for receiving the data */
uct_iface_set_am_handler(m_e2->iface(), 0, ib_am_handler , recv_buffer, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e2->iface(), 0, ib_am_handler , recv_buffer, 0);

/* send the data */
uct_ep_am_short(m_e1->ep(0), 0, test_ib_hdr, &send_data, sizeof(send_data));
Expand Down Expand Up @@ -362,7 +362,7 @@ class test_uct_event_ib : public test_uct_ib {

/* set a callback for the uct to invoke for receiving the data */
uct_iface_set_am_handler(m_e1->iface(), 0, ib_am_handler, m_buf1->ptr(),
UCT_CB_FLAG_SYNC);
0);

test_uct_event_ib::bcopy_pack_count = 0;
}
Expand Down
12 changes: 4 additions & 8 deletions test/gtest/uct/ib/test_rc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ void test_rc::connect()
m_e1->connect(0, *m_e2, 0);
m_e2->connect(0, *m_e1, 0);

uct_iface_set_am_handler(m_e1->iface(), 0, am_dummy_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e1->iface(), 0, am_dummy_handler, NULL, 0);
uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler, NULL, 0);
}


Expand Down Expand Up @@ -80,10 +78,8 @@ void test_rc_flow_control::init()
ucs_assert(rc_iface(m_e1)->config.fc_enabled);
ucs_assert(rc_iface(m_e2)->config.fc_enabled);

uct_iface_set_am_handler(m_e1->iface(), FLUSH_AM_ID, am_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e2->iface(), FLUSH_AM_ID, am_handler,
NULL, UCT_CB_FLAG_SYNC);
uct_iface_set_am_handler(m_e1->iface(), FLUSH_AM_ID, am_handler, NULL, 0);
uct_iface_set_am_handler(m_e2->iface(), FLUSH_AM_ID, am_handler, NULL, 0);

}

Expand Down
Loading

0 comments on commit a90c731

Please sign in to comment.