Skip to content

Commit

Permalink
UCP/API/TEST: Add non-blocking endpoint flush and use it for RMA tests
Browse files Browse the repository at this point in the history
The blocking version of endpoint flush may potentially cause a deadlock
because it does not progress communications on anything except the
current worker. Introduce a non-blocking flush and use it for unit
tests.
  • Loading branch information
yosefe committed Oct 13, 2017
1 parent 910d4d4 commit 136ee66
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 169 deletions.
58 changes: 40 additions & 18 deletions src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1588,19 +1588,30 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream);
/**
* @ingroup UCP_ENDPOINT
*
* @brief Flush outstanding AMO and RMA operations on the @ref ucp_ep_h
* "endpoint".
* @brief Non-blocking flush of outstanding AMO and RMA operations on the
* @ref ucp_ep_h "endpoint".
*
* This routine flushes all outstanding AMO and RMA communications on the
* @ref ucp_ep_h "endpoint". All the AMO and RMA operations issued on the
* @a ep prior to this call are completed both at the origin and at the target
* @ref ucp_ep_h "endpoint" when this call returns.
*
* @param [in] ep UCP endpoint.
*
* @return Error code as defined by @ref ucs_status_t
*/
ucs_status_t ucp_ep_flush(ucp_ep_h ep);
* @param [in] flags Flags for flush operation. Reserved for future use.
* @param [in] cb Callback which will be called when the flush operation
* completes.
*
* @return UCS_OK - The flush operation was completed immediately.
* @return UCS_PTR_IS_ERR(_ptr) - The flush operation failed.
* @return otherwise - Flush operation was scheduled and can be completed
* in any point in time. The request handle is returned
* to the application in order to track progress. The
* application is responsible to release the handle
* using @ref ucp_request_free "ucp_request_free()"
* routine.
*/
ucs_status_ptr_t ucp_ep_flush_nb(ucp_ep_h ep, unsigned flags,
ucp_send_callback_t cb);


/**
Expand Down Expand Up @@ -2315,7 +2326,7 @@ ucs_status_t ucp_put(ucp_ep_h ep, const void *buffer, size_t length,
* completed immediately the routine return UCS_OK, otherwise UCS_INPROGRESS
* or an error is returned to user.
*
* @note A user can use @ref ucp_worker_flush "ucp_worker_flush()"
* @note A user can use @ref ucp_worker_flush_nb "ucp_worker_flush_nb()"
* in order to guarantee re-usability of the source address @e buffer.
*
* @param [in] ep Remote endpoint handle.
Expand Down Expand Up @@ -2369,7 +2380,7 @@ ucs_status_t ucp_get(ucp_ep_h ep, void *buffer, size_t length,
* address. The routine returns immediately and @b does @b not guarantee that
* remote data is loaded and stored under the local address @e buffer.
*
* @note A user can use @ref ucp_worker_flush "ucp_worker_flush()" in order
* @note A user can use @ref ucp_worker_flush_nb "ucp_worker_flush_nb()" in order
* guarantee that remote data is loaded and stored under the local address
* @e buffer.
*
Expand Down Expand Up @@ -2645,8 +2656,8 @@ ucs_status_t ucp_atomic_cswap64(ucp_ep_h ep, uint64_t compare, uint64_t swap,
* memory address @a remote_addr and the @ref ucp_rkey_h "remote memory handle"
* @a rkey.
* Return from the function does not guarantee completion. A user must
* call @ref ucp_ep_flush or @ref ucp_worker_flush to guarentee that the remote
* value has been updated.
* call @ref ucp_ep_flush_nb or @ref ucp_worker_flush_nb to guarantee that the
* remote value has been updated.
*
* @param [in] ep UCP endpoint.
* @param [in] opcode One of @ref ucp_atomic_post_op_t.
Expand Down Expand Up @@ -2855,10 +2866,10 @@ void ucp_dt_destroy(ucp_datatype_t datatype);
* which follow the call to @ref ucp_worker_fence "fence".
*
* @note The primary difference between @ref ucp_worker_fence "ucp_worker_fence()"
* and the @ref ucp_worker_flush "ucp_worker_flush()" is the fact the fence
* and the @ref ucp_worker_flush_nb "ucp_worker_flush_nb()" is the fact the fence
* routine does not guarantee completion of the operations on the call return but
* only ensures the order between communication operations. The
* @ref ucp_worker_flush "flush" operation on return guarantees that all
* @ref ucp_worker_flush_nb "flush" operation on return guarantees that all
* operations are completed and corresponding memory regions were updated.
*
* @param [in] worker UCP worker.
Expand All @@ -2879,15 +2890,26 @@ ucs_status_t ucp_worker_fence(ucp_worker_h worker);
* @a worker prior to this call are completed both at the origin and at the
* target when this call returns.
*
* @note For description of the differences between @ref ucp_worker_flush
* @note For description of the differences between @ref ucp_worker_flush_nb
* "flush" and @ref ucp_worker_fence "fence" operations please see
* @ref ucp_worker_fence "ucp_worker_fence()"
*
* @param [in] worker UCP worker.
*
* @return Error code as defined by @ref ucs_status_t
*/
ucs_status_t ucp_worker_flush(ucp_worker_h worker);
* @param [in] worker UCP worker.
* @param [in] flags Flags for flush operation. Reserved for future use.
* @param [in] cb Callback which will be called when the flush operation
* completes.
*
* @return UCS_OK - The flush operation was completed immediately.
* @return UCS_PTR_IS_ERR(_ptr) - The flush operation failed.
* @return otherwise - Flush operation was scheduled and can be completed
* in any point in time. The request handle is returned
* to the application in order to track progress. The
* application is responsible to release the handle
* using @ref ucp_request_free "ucp_request_free()"
* routine.
*/
ucs_status_ptr_t ucp_worker_flush_nb(ucp_worker_h worker, unsigned flags,
ucp_send_callback_t cb);


/**
Expand Down
27 changes: 27 additions & 0 deletions src/ucp/api/ucp_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,31 @@ ucs_status_ptr_t ucp_disconnect_nb(ucp_ep_h ep);
ucs_status_t ucp_request_test(void *request, ucp_tag_recv_info_t *info);


/**
* @ingroup UCP_ENDPOINT
* @deprecated Replaced by @ref ucp_ep_flush_nb.
*/
ucs_status_t ucp_ep_flush(ucp_ep_h ep);

/**
* @ingroup UCP_WORKER
*
* @brief Flush outstanding AMO and RMA operations on the @ref ucp_worker_h
* "worker"
*
* This routine flushes all outstanding AMO and RMA communications on the
* @ref ucp_worker_h "worker". All the AMO and RMA operations issued on the
* @a worker prior to this call are completed both at the origin and at the
* target when this call returns.
*
* @note For description of the differences between @ref ucp_worker_flush
* "flush" and @ref ucp_worker_fence "fence" operations please see
* @ref ucp_worker_fence "ucp_worker_fence()"
*
* @param [in] worker UCP worker.
*
* @return Error code as defined by @ref ucs_status_t
*/
ucs_status_t ucp_worker_flush(ucp_worker_h worker);

#endif
45 changes: 42 additions & 3 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ ucs_status_t ucp_ep_create(ucp_worker_h worker,
return status;
}


void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
Expand Down Expand Up @@ -361,10 +360,12 @@ void ucp_ep_destroy_internal(ucp_ep_h ep, const char *message)
ucs_free(ep);
}

void ucp_ep_disconnected(ucp_ep_h ep)
static void ucp_ep_disconnected(ucp_ep_h ep)
{
ucp_recv_desc_t *rdesc;

ucs_trace("ep %p is disconnected", ep);

while (!ucs_queue_is_empty(&ep->stream_data)) {
rdesc = ucs_queue_pull_elem_non_empty(&ep->stream_data, ucp_recv_desc_t,
stream_queue);
Expand All @@ -390,6 +391,36 @@ void ucp_ep_disconnected(ucp_ep_h ep)
ucp_ep_destroy_internal(ep, " from disconnect");
}

static unsigned ucp_ep_do_disconnect(void *arg)
{
ucp_request_t *req = arg;

ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));

ucp_ep_disconnected(req->send.ep);

/* Complete send request from here, to avoid releasing the request while
* slow-path element is still pending */
ucp_request_complete_send(req, req->status);

return 0;
}

static void ucp_ep_close_flushed_callback(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;

/* If a flush is completed from a pending/completion callback, we need to
* schedule slow-path callback to release the endpoint later, since a UCT
* endpoint cannot be released from pending/completion callback context.
*/
ucs_trace("adding slow-path callback to destroy ep %p", ep);
req->send.disconnect.prog_id = UCS_CALLBACKQ_ID_NULL;
uct_worker_progress_register_safe(ep->worker->uct, ucp_ep_do_disconnect,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&req->send.disconnect.prog_id);
}

ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode)
{
ucp_worker_h worker = ep->worker;
Expand All @@ -403,7 +434,15 @@ ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode)
UCP_THREAD_CS_ENTER_CONDITIONAL(&worker->mt_lock);

UCS_ASYNC_BLOCK(&worker->async);
request = ucp_disconnect_nb_internal(ep, mode);
request = ucp_ep_flush_internal(ep,
(mode == UCP_EP_CLOSE_MODE_FLUSH) ?
UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL,
NULL, 0,
ucp_ep_close_flushed_callback);
if (!UCS_PTR_IS_PTR(request)) {
ucp_ep_disconnected(ep);
}

UCS_ASYNC_UNBLOCK(&worker->async);

UCP_THREAD_CS_EXIT_CONDITIONAL(&worker->mt_lock);
Expand Down
7 changes: 4 additions & 3 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, uint64_t dest_uuid,
const char *peer_name, const char *message,
ucp_ep_h *ep_p);

ucs_status_ptr_t ucp_disconnect_nb_internal(ucp_ep_h ep, unsigned mode);

void ucp_ep_disconnected(ucp_ep_h ep);
ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
ucp_send_callback_t req_cb,
unsigned req_flags,
ucp_request_callback_t flushed_cb);

ucs_status_t ucp_ep_create_stub(ucp_worker_h worker, uint64_t dest_uuid,
const char *message, ucp_ep_h *ep_p);
Expand Down
22 changes: 14 additions & 8 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ enum {
};


/* Callback for UCP requests */
typedef void (*ucp_request_callback_t)(ucp_request_t *req);


/**
* Request in progress.
*/
Expand Down Expand Up @@ -118,13 +114,17 @@ struct ucp_request {
} rndv_get;

struct {
ucp_request_callback_t flushed_cb;/* Called when flushed */
uct_worker_cb_id_t slow_cb_id;/* Slow-path callback */
ucp_lane_map_t lanes; /* Which lanes need to be flushed */
unsigned uct_flags; /* Flags to pass to
ucp_request_callback_t flushed_cb;/* Called when flushed */
uct_worker_cb_id_t prog_id; /* Progress callback ID */
ucp_lane_map_t lanes; /* Which lanes need to be flushed */
unsigned uct_flags; /* Flags to pass to
@ref uct_ep_flush */
} flush;

struct {
uct_worker_cb_id_t prog_id;/* Slow-path callback */
} disconnect;

struct {
uint64_t remote_addr; /* Remote address */
ucp_atomic_fetch_op_t op; /* Requested AMO */
Expand Down Expand Up @@ -167,6 +167,12 @@ struct ucp_request {
ucp_mem_desc_t *rdesc;
uct_tag_context_t uct_ctx; /* Transport offload context */
} recv;

struct {
ucp_worker_h worker; /* Worker to flush */
ucp_send_callback_t cb; /* Completion callback */
uct_worker_cb_id_t prog_id; /* Progress callback ID */
} flush_worker;
};
};

Expand Down
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,10 @@ typedef void (*ucp_am_tracer_t)(ucp_worker_h worker, uct_am_trace_type_t type,
char *buffer, size_t max);


/**
* Internal callback for UCP requests
*/
typedef void (*ucp_request_callback_t)(ucp_request_t *req);


#endif
Loading

0 comments on commit 136ee66

Please sign in to comment.