Skip to content

Commit

Permalink
Merge pull request #5 from artpol84/osc/mt_v2
Browse files Browse the repository at this point in the history
Add Operation function
  • Loading branch information
xinzhao3 authored Nov 12, 2018
2 parents 3075638 + d3926f7 commit c7c2f68
Showing 1 changed file with 264 additions and 1 deletion.
265 changes: 264 additions & 1 deletion opal/mca/common/ucx/common_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ OBJ_CLASS_DECLARATION(_mem_info_t);

typedef struct {
int mem_id;
int is_freed;
opal_common_ucx_mem_t *gmem;
_mem_info_t *mem;
} _tlocal_mem_t;
Expand Down Expand Up @@ -451,7 +452,6 @@ OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
return ret;
}


OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
{
/* Go over the list, free idle list items */
Expand Down Expand Up @@ -616,3 +616,266 @@ OPAL_DECLSPEC int opal_common_ucx_mem_create(opal_common_ucx_ctx_t *ctx, int com
(*mem_ptr) = NULL;
return ret;
}

static int
_tlocal_extend_ctxtbl(_tlocal_table_t *tbl, size_t append)
{
size_t i;
tbl->ctx_tbl = realloc(tbl->ctx_tbl, newsize * sizeof(*tbl->ctx_tbl));
for (i = tbl->ctx_tbl_size; i < tbl->ctx_tbl_size + append; i++) {
tbl->ctx_tbl[i] = calloc(1, sizeof(_thr_local_cctx_t));
if (NULL == tbl->ctx_tbl[i]) {
return OPAL_ERR_OUT_OF_RESOURCE;
}

}
tbl->ctx_tbl_size += append;
return OPAL_SUCCESS;
}
static int
_tlocal_extend_memtbl(_tlocal_table_t *tbl, size_t append)
{
size_t i;
tbl->mem_tbl = realloc(tbl->mem_tbl, newsize * sizeof(*tbl->mem_tbl));
for (i = tbl->mem_tbl_size; i < tbl->mem_tbl_size + append; i++) {
tbl->mem_tbl[i] = calloc(1, sizeof(*tbl->mem_tbl[i]));
if (NULL == tbl->mem_tbl[i]) {
return OPAL_ERR_OUT_OF_RESOURCE;
}

}
tbl->mem_tbl_size += append;
return OPAL_SUCCESS;
}

// TODO: don't want to inline this function
static _tlocal_table_t* _common_ucx_init_tls(opal_common_ucx_wpool_t *wpool)
{
_tlocal_table_t *tls = NULL;
tls = OBJ_NEW(_tlocal_table_t);
memset(tls, 0, sizeof(tls));

/* Add this TLS to the global wpool structure for future
* cleanup purposes */
tls->wpool = wpool;
opal_mutex_lock(&wpool->mutex);
opal_list_append(&wpool->tls_list, &tls->super);
opal_mutex_unlock(&wpool->mutex);

if( _tlocal_extend_ctxtbl(tls, 4) ){
// TODO: handle error
}
if(_tlocal_extend_memtbl(tls, 4)) {
// TODO: handle error
}
pthread_set_specific(_tlocal_key, tls);
return tls;
}

static inline _worker_info_t *_tlocal_search_ctx(_tlocal_table_t *tls, int ctx_id)
{
int i;
for(i=0; i<tls->ctx_tbl_size; i++) {
if( tls->ctx_tbl[i] == ctx_id){
return tls->ctx_tbl[i]->worker;
}
}
return NULL;
}

// TODO: Don't want to inline this (slow path)
static _worker_info_t *_tlocal_add_ctx(_tlocal_table_t *tls,
opal_common_ucx_ctx_t *ctx)
{
int i, rc;

/* Try to find tavailable spot in the table */
for (i=0; i<tls->ctx_tbl_size; i++) {
if (tls->ctx_tbl[i]->is_freed) {
/* Cleanup the record */
_tlocal_cleanup_ctx_record(tls->ctx_tbl[i]);
break;
}
}

if( tls->ctx_tbl_size >= i ){
i = tls->ctx_tbl_size;
if( rc = _tlocal_extend_ctxtbl(tls, 4) ){
//TODO: error out
return NULL;
}
}
tls->ctx_tbl[i]->ctx_id = ctx->ctx_id;
tls->ctx_tbl[i]->gctx = ctx;
tls->ctx_tbl[i]->is_freed = 0;
tls->ctx_tbl[i]->worker = _get_new_worker(tls);

/* Make sure that we completed all the data structures before
* placing the item to the list
* NOTE: essentially we don't need this as list append is an
* operation protected by mutex
*/
opal_atomic_wmb();

_ctx_append_worker(ctx, tls->ctx_tbl[i]);

return tls->ctx_tbl[i];
}

static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx, int target)
{
ucp_ep_params_t ep_params;
_worker_info_t *winfo = ctx->worker;
opal_common_ucx_ctx_t *gctx = ctx->gctx;
int displ;

memset(&ep_params, 0, sizeof(ucp_ep_params_t));
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;

opal_mutex_lock(&winfo->ctx->mutex);
displ = gctx->recv_worker_displs[target];
ep_params.address = (ucp_address_t *)&(gctx->recv_worker_addrs[disp]);
status = ucp_ep_create(winfo->worker, &ep_params, &winfo->endpoints[target]);
if (status != UCS_OK) {
// TODO: error out here
// OSC_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status);
ret = OPAL_ERROR;
}
return OPAL_SUCCESS;
}

static inline _worker_info_t *
_tlocal_search_mem(_tlocal_table_t *tls, int mem_id)
{
int i;
for(i=0; i<tls->mem_tbl_size; i++) {
if( tls->mem_tbl[i] == mem_id){
return tls->mem_tbl[i]->mem;
}
}
return NULL;
}

// TODO: Don't want to inline this (slow path)
static _worker_info_t *_tlocal_add_mem(_tlocal_table_t *tls,
opal_common_ucx_ctx_t *mem)
{
int i, rc;

/* Try to find tavailable spot in the table */
for (i=0; i<tls->mem_tbl_size; i++) {
if (tls->mem_tbl[i]->is_freed) {
/* Cleanup the record */
_tlocal_cleanup_mem_record(tls->mem_tbl[i]);
break;
}
}

if( tls->mem_tbl_size >= i ){
i = tls->mem_tbl_size;
if( rc = _tlocal_extend_memtbl(tls, 4) ){
//TODO: error out
return NULL;
}
}
tls->mem_tbl[i]->mem_id = mem->mem_id;
tls->mem_tbl[i]->gmem = mem;
tls->mem_tbl[i]->is_freed = 0;
tls->mem_tbl[i]->mem = _get_new_memory(tls, mem);

/* Make sure that we completed all the data structures before
* placing the item to the list
* NOTE: essentially we don't need this as list append is an
* operation protected by mutex
*/
opal_atomic_wmb();

_mem_append_rkey(mem, tls->mem_tbl[i]);

return tls->mem_tbl[i];
}

static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
{
_mem_info_t *minfo = mem_rec->mem;
opal_common_ucx_mem_t *gmem = mem_rec->gmem;
int displ = gmem->mem_displs[target];

status = ucp_ep_rkey_unpack(ep, &gmem->mem_addrs[displ],
&minfo->rkeys[target]);
if (status != UCS_OK) {
// TODO: error out here
// OSC_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status);
ret = OPAL_ERROR;
}
return OPAL_SUCCESS;
}

OPAL_DECLSPEC int opal_common_ucx_mem_op(opal_common_ucx_mem_t *mem,
opal_common_ucx_op_t op,
int target,
void *buffer, size_t len,
uint64_t rem_addr)
{
_tlocal_table_t *tls = NULL;
_worker_info_t *worker_info;
_mem_info_t *mem_info;
ucp_ep_h ep;
ucp_rkey_h rkey;

tls = pthread_get_specific(_tlocal_key);
if( OPAL_UNLIKELY(NULL == tls) ) {
tls = _common_ucx_init_tls(mem->ctx->wpool);
}
/* Obtain the worker structure */
worker_info = _tlocal_search_ctx(tls, mem->ctx->ctx_id);
if (OPAL_UNLIKELY(NULL == worker_info)) {
worker_info = _tlocal_add_ctx(tls, mem->ctx);
}

/* Obtain the endpoint */
if (OPAL_UNLIKELY(NULL == worker_info->endpoints[target])) {
if (rc = _tlocal_ctx_connect(worker_info, target)) {
return rc;
}
}
ep = worker_info->endpoints[target];

/* Obtain the memory region info */
mem_info = _tlocal_search_mem(tls, mem->mem_id);
if (OPAL_UNLIKELY(mem_info == NULL)) {
mem_info = _tlocal_add_mem(tls, mem->mem_id);
}

/* Obtain the rkey */
if (OPAL_UNLIKELY(NULL == mem_info->rkeys[target])) {
// Create the rkey
if (rc = _tlocal_mem_rkey_create(mem_info, target)) {
return rc;
}
}
rkey = mem_info->rkeys[target];

/* Perform the operation */
opal_mutex_lock(worker_info->mutex);
switch(op){
case OPAL_COMMON_UCX_GET:
status = ucp_put_nbi(ep, buffer,len, rem_addr, rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
// TODO: Fix the output
// OSC_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
return OPAL_ERROR;
}
break;
case OPAL_COMMON_UCX_PUT:
status = ucp_get_nbi(ep, buffer,len, rem_addr, rkey);
if (status != UCS_OK && status != UCS_INPROGRESS) {
// TODO: Fix the output
// OSC_UCX_VERBOSE(1, "ucp_put_nbi failed: %d", status);
return OPAL_ERROR;
}
break;
}
opal_mutex_unlock(worker_info->mutex);
}

0 comments on commit c7c2f68

Please sign in to comment.