Skip to content

Commit

Permalink
Merge pull request #7 from artpol84/osc/mt_v2
Browse files Browse the repository at this point in the history
Intermediate fixes
  • Loading branch information
xinzhao3 authored Nov 12, 2018
2 parents 113f2ff + 2f1ac68 commit ccfb889
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 17 deletions.
101 changes: 87 additions & 14 deletions opal/mca/common/ucx/common_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "opal/memoryhooks/memory.h"

#include <ucm/api/ucm.h>
#include <pthread.h>

/***********************************************************************/

Expand Down Expand Up @@ -318,7 +319,7 @@ static ucp_worker_h _create_ctx_worker(opal_common_ucx_wpool_t *wpool)
return worker;
}

static void _wpool_add_to_idle(opal_common_ucx_wpool_t *wpool,
static int _wpool_add_to_idle(opal_common_ucx_wpool_t *wpool,
_worker_info_t *wkr)
{
_idle_list_item_t *item;
Expand All @@ -334,11 +335,15 @@ static void _wpool_add_to_idle(opal_common_ucx_wpool_t *wpool,
}

item = OBJ_NEW(_idle_list_item_t);
if (NULL == item) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
item->ptr = wkr;

opal_mutex_lock(&wpool->mutex);
opal_list_append(&wpool->idle_workers, &item->super);
opal_mutex_unlock(&wpool->mutex);
return OPAL_SUCCESS;
}

static _worker_info_t* _wpool_remove_from_idle(opal_common_ucx_wpool_t *wpool)
Expand All @@ -361,7 +366,7 @@ static _worker_info_t* _wpool_remove_from_idle(opal_common_ucx_wpool_t *wpool)
return wkr;
}

OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate()
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void)
{
opal_common_ucx_wpool_t *ptr = calloc(1, sizeof(opal_common_ucx_wpool_t *));
return ptr;
Expand Down Expand Up @@ -513,6 +518,48 @@ OPAL_DECLSPEC int opal_common_ucx_ctx_create(opal_common_ucx_wpool_t *wpool, int
return ret;
}

static int _common_ucx_ctx_free(opal_common_ucx_ctx_t *ctx)
{
free(ctx->recv_worker_addrs);
free(ctx->recv_worker_displs);
OBJ_DESTRUCT(&ctx->mutex);
OBJ_DESTRUCT(&ctx->workers);
free(ctx);
}

OPAL_DECLSPEC int opal_common_ucx_ctx_release(opal_common_ucx_ctx_t *ctx)
{
// TODO: implement
_tlocal_ctx_release(ctx);
}

static void
_common_ucx_ctx_remove(opal_common_ucx_ctx_t *ctx, _worker_info_t *ctx_rec)
{
int can_free = 0;
_worker_list_item_t *item = NULL, next;

opal_mutex_lock(&ctx->mutex);
OPAL_LIST_FOREACH_SAFE(item, next, &ctx->workers, _worker_list_item_t) {
if (ctx_rec == item->ptr) {
opal_list_remove_item(&ctx->workers, &item->super);
OBJ_RELEASE(item);
break;
}
}
if (0 == opal_list_get_size()) {
can_free = 1;
}
opal_mutex_unlock(&ctx->mutex);

if (can_free) {
/* All references to this data structure are removed
* we can safely release communication context structure
*/
_common_ucx_ctx_free(ctx);
}
}

static int _comm_ucx_mem_map(opal_common_ucx_wpool_t *wpool,
void **base, size_t size, ucp_mem_h *memh_ptr,
opal_common_ucx_mem_type_t mem_type)
Expand Down Expand Up @@ -622,30 +669,32 @@ static int
_tlocal_extend_ctxtbl(_tlocal_table_t *tbl, size_t append)
{
size_t i;
size_t newsize = (tbl->ctx_tbl_size + append);
tbl->ctx_tbl = realloc(tbl->ctx_tbl, newsize * sizeof(*tbl->ctx_tbl));
for (i = tbl->ctx_tbl_size; i < tbl->ctx_tbl_size + append; i++) {
tbl->ctx_tbl[i] = calloc(1, sizeof(_thr_local_cctx_t));
for (i = tbl->ctx_tbl_size; i < newsize; i++) {
tbl->ctx_tbl[i] = calloc(1, sizeof(*tbl->ctx_tbl[i]));
if (NULL == tbl->ctx_tbl[i]) {
return OPAL_ERR_OUT_OF_RESOURCE;
}

}
tbl->ctx_tbl_size += append;
tbl->ctx_tbl_size = newsize;
return OPAL_SUCCESS;
}
static int
_tlocal_extend_memtbl(_tlocal_table_t *tbl, size_t append)
{
size_t i;
size_t newsize = (tbl->ctx_tbl_size + append);

tbl->mem_tbl = realloc(tbl->mem_tbl, newsize * sizeof(*tbl->mem_tbl));
for (i = tbl->mem_tbl_size; i < tbl->mem_tbl_size + append; i++) {
tbl->mem_tbl[i] = calloc(1, sizeof(*tbl->mem_tbl[i]));
if (NULL == tbl->mem_tbl[i]) {
return OPAL_ERR_OUT_OF_RESOURCE;
}

}
tbl->mem_tbl_size += append;
tbl->mem_tbl_size = newsize;
return OPAL_SUCCESS;
}

Expand All @@ -654,7 +703,7 @@ static _tlocal_table_t* _common_ucx_init_tls(opal_common_ucx_wpool_t *wpool)
{
_tlocal_table_t *tls = NULL;
tls = OBJ_NEW(_tlocal_table_t);
memset(tls, 0, sizeof(tls));
memset(tls, 0, sizeof(*tls));

/* Add this TLS to the global wpool structure for future
* cleanup purposes */
Expand All @@ -669,26 +718,51 @@ static _tlocal_table_t* _common_ucx_init_tls(opal_common_ucx_wpool_t *wpool)
if(_tlocal_extend_memtbl(tls, 4)) {
// TODO: handle error
}
pthread_set_specific(_tlocal_key, tls);
pthread_setspecific(_tlocal_key, tls);
return tls;
}

static inline _worker_info_t *_tlocal_search_ctx(_tlocal_table_t *tls, int ctx_id)
{
int i;
size_t i;
for(i=0; i<tls->ctx_tbl_size; i++) {
if( tls->ctx_tbl[i] == ctx_id){
return tls->ctx_tbl[i]->worker;
if( tls->ctx_tbl[i]->ctx_id == ctx_id){
return tls->ctx_tbl[i]->winfo;
}
}
return NULL;
}

static int _tlocal_cleanup_ctx_record(_tlocal_ctx_t *ctx_rec)
{
int rc;
if (!ctx_rec->is_freed) {
return OPAL_SUCCESS;
}
/* Remove myself from the communication context structure
* This may result in context release as we are using
* delayed cleanup */
rc = _common_ucx_ctx_remove(ctx_rec->gctx, ctx_rec);
if (rc) {
return rc;
}

/* Return the worker back to the
* This may result in context release as we are using
* delayed cleanup */
rc = _wpool_add_to_idle(ctx_rec->gctx->wpool, ctx_rec->winfo);
if (rc) {
return rc;
}
memset(ctx_rec, 0, sizeof(*ctx_rec));
}

// TODO: Don't want to inline this (slow path)
static _worker_info_t *_tlocal_add_ctx(_tlocal_table_t *tls,
opal_common_ucx_ctx_t *ctx)
{
int i, rc;
size_t i;
int rc;

/* Try to find tavailable spot in the table */
for (i=0; i<tls->ctx_tbl_size; i++) {
Expand All @@ -708,7 +782,6 @@ static _worker_info_t *_tlocal_add_ctx(_tlocal_table_t *tls,
}
tls->ctx_tbl[i]->ctx_id = ctx->ctx_id;
tls->ctx_tbl[i]->gctx = ctx;
tls->ctx_tbl[i]->is_freed = 0;
tls->ctx_tbl[i]->winfo = _get_new_worker(tls, ctx);

/* Make sure that we completed all the data structures before
Expand Down
6 changes: 3 additions & 3 deletions opal/mca/common/ucx/common_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ typedef struct {
} opal_common_ucx_wpool_t;

typedef struct {
int ctx_id;
opal_atomic_int32_t ctx_id;
opal_mutex_t mutex;
opal_common_ucx_wpool_t *wpool; /* which wpool this ctx belongs to */
opal_list_t workers; /* active worker lists */
Expand All @@ -118,7 +118,7 @@ typedef struct {
} opal_common_ucx_ctx_t;

typedef struct {
int mem_id;
opal_atomic_int32_t mem_id;
opal_mutex_t mutex;
opal_common_ucx_ctx_t *ctx; /* which ctx this mem_reg belongs to */
ucp_mem_h memh;
Expand Down Expand Up @@ -147,7 +147,7 @@ typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len
char **recv_info, int **disps,
void *metadata);

OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate();
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
int proc_world_size,
Expand Down

0 comments on commit ccfb889

Please sign in to comment.