Skip to content

Commit

Permalink
Merge pull request openucx#41 from Artemy-Mellanox/topic/sig
Browse files Browse the repository at this point in the history
UCT/DEVX/SIG: Add signature attributes
  • Loading branch information
gleon99 authored Jun 4, 2023
2 parents 34795d0 + 2b8a61d commit 13003a5
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 56 deletions.
31 changes: 17 additions & 14 deletions examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include <arpa/inet.h> /* inet_addr */
#include <unistd.h> /* getopt */
#include <stdlib.h> /* atoi */
#include <sys/mman.h>

#define DEFAULT_PORT 13337
#define IP_STRING_LEN 50
Expand All @@ -51,6 +50,7 @@
#define TEST_AM_ID 0
#define ALLOCATOR_NUM_OF_BUFFERS 32768
#define ALLOCATOR_PAYLOAD_LENGTH 8192
#define SIGNATURE_BLOCK 4048


static long test_string_length = 16;
Expand All @@ -60,6 +60,7 @@ static sa_family_t ai_family = AF_INET;
static int num_iterations = DEFAULT_NUM_ITERATIONS;
static int user_allocator = 0;
static int sig = 0;
static int sig_block = SIGNATURE_BLOCK;
int next_held_data_desc_idx = 0;
static void *held_data_descs[ALLOCATOR_NUM_OF_BUFFERS];

Expand Down Expand Up @@ -836,8 +837,7 @@ memory_allocator_chunk_alloc(ucs_mpool_t *mp, size_t *size_p, void **chunk_p)
ucs_status_t status;
ucp_mem_map_params_t params;

*chunk_p = mmap(NULL, chunk_size, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
*chunk_p = malloc(chunk_size);
if (*chunk_p == NULL) {
return UCS_ERR_NO_MEMORY;
}
Expand All @@ -863,16 +863,12 @@ static void memory_allocator_chunk_release(ucs_mpool_t *mp, void *chunk)
const ucp_worker_memory_allocator_obj_t *allocator =
(ucp_worker_memory_allocator_obj_t*)mp;
ucp_mem_attr_t memh_attr;
int ret;

memh_attr.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS |
UCP_MEM_ATTR_FIELD_LENGTH;
ucp_mem_query(allocator->memh, &memh_attr);
ucp_mem_unmap(allocator->context, allocator->memh);
ret = munmap(memh_attr.address, memh_attr.length);
if (ret != 0) {
fprintf(stderr, "munmap() failed: %m\n");
}
free(memh_attr.address);
}

static ucs_mpool_ops_t memory_allocator_ops = {
Expand All @@ -884,15 +880,20 @@ static ucs_mpool_ops_t memory_allocator_ops = {
};

static ucs_status_t
memory_allocator_init(ucp_context_h context, const size_t buffer_size,
memory_allocator_init(ucp_context_h context,
ucp_worker_memory_allocator_obj_t **allocator_obj)
{
ucp_worker_memory_allocator_obj_t *allocator =
(ucp_worker_memory_allocator_obj_t*)malloc(
sizeof(ucp_worker_memory_allocator_obj_t));
size_t buffer_size = ALLOCATOR_PAYLOAD_LENGTH;
ucs_mpool_params_t mp_params;
ucs_status_t status;

if (sig) {
buffer_size = sig_block;
}

ucs_mpool_params_reset(&mp_params);
allocator->memh = NULL;
allocator->context = context;
Expand All @@ -912,9 +913,7 @@ memory_allocator_init(ucp_context_h context, const size_t buffer_size,
mp_params.max_chunk_size = -1;
mp_params.ops = &memory_allocator_ops;
mp_params.name = "mpool_allocator";
if (sig) {
mp_params.alignment = 4096;
}
mp_params.alignment = 1;
status = ucs_mpool_init(&mp_params, &allocator->mpool);
if (status != UCS_OK) {
return status;
Expand Down Expand Up @@ -985,6 +984,11 @@ static int init_worker(ucp_context_h ucp_context, ucp_worker_h *ucp_worker,
if (sig) {
worker_params.field_mask |= UCP_WORKER_PARAM_FIELD_FLAGS;
worker_params.flags |= UCP_WORKER_FLAG_SIGNATURE;

worker_params.user_allocator.stride = sig_block +
sizeof(ucs_mpool_elem_t);
worker_params.user_allocator.offset = sizeof(ucs_mpool_elem_t) +
sizeof(ucs_mpool_chunk_t);
}
}

Expand Down Expand Up @@ -1143,8 +1147,7 @@ static int run_server(ucp_context_h ucp_context, ucp_worker_h ucp_worker,
int ret;

if (user_allocator) {
status = memory_allocator_init(ucp_context, ALLOCATOR_PAYLOAD_LENGTH,
&allocator_obj);
status = memory_allocator_init(ucp_context, &allocator_obj);
if (status != UCS_OK) {
ret = -1;
fprintf(stderr, "failed to create memory allocator (%s)\n",
Expand Down
13 changes: 12 additions & 1 deletion src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ typedef size_t (*ucp_mem_allocator_cb_t)(void *arg, size_t num_of_buffers,

typedef struct {
/**
* User memory allocator get buf function used by UCX in post receive.
* User memory allocator get buf function used by UCX in post receive.
*/
ucp_mem_allocator_cb_t cb;

Expand All @@ -1257,6 +1257,17 @@ typedef struct {
* This will be the size of the active message fragment.
*/
size_t buffer_size;

/**
* Stride in buffer provided by memory allocator. Should be greater than
* or equal to buffer_size.
*/
size_t stride;

/**
* Offset of first block in buffer provided by memory allocatore.
*/
size_t offset;
} ucp_user_mem_allocator_t;


Expand Down
34 changes: 19 additions & 15 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1132,15 +1132,6 @@ static ucs_status_t ucp_worker_add_resource_ifaces(ucp_worker_h worker)
iface_params.mode.device.dev_name = resource->tl_rsc.dev_name;
}

iface_params.field_mask |= UCT_IFACE_PARAM_FIELD_RX_HEADER_LENGTH;
if (worker->user_mem_allocator.cb != NULL) {
iface_params.field_mask |= UCT_IFACE_PARAM_FIELD_USER_ALLOCATOR |
UCT_IFACE_PARAM_FIELD_RX_PAYLOAD_LENGTH;
iface_params.rx_allocator.cb = ucp_worker_user_allocator_get_cb;
iface_params.rx_payload_length =
worker->user_mem_allocator.buffer_size;
}

status = ucp_worker_iface_open(worker, tl_id, &iface_params,
&worker->ifaces[iface_id++]);
if (status != UCS_OK) {
Expand Down Expand Up @@ -1343,8 +1334,24 @@ ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id,
iface_params->field_mask |= UCT_IFACE_PARAM_FIELD_FEATURES;
iface_params->features = ucp_worker_get_uct_features(worker->context);

if (worker->flags & UCP_WORKER_FLAG_SIGNATURE) {
iface_params->features |= UCT_IFACE_FEATURE_SIGNATURE;
iface_params->field_mask |= UCT_IFACE_PARAM_FIELD_RX_HEADER_LENGTH;

if (worker->user_mem_allocator.cb != NULL) {
iface_params->field_mask |= UCT_IFACE_PARAM_FIELD_USER_ALLOCATOR |
UCT_IFACE_PARAM_FIELD_RX_PAYLOAD_LENGTH;
iface_params->rx_allocator.cb = ucp_worker_user_allocator_get_cb;
iface_params->rx_payload_length =
worker->user_mem_allocator.buffer_size;

if (worker->flags & UCP_WORKER_FLAG_SIGNATURE) {
iface_params->features |= UCT_IFACE_FEATURE_SIGNATURE;
iface_params->rx_allocator.sig_attr.block =
worker->user_mem_allocator.buffer_size;
iface_params->rx_allocator.sig_attr.stride =
worker->user_mem_allocator.stride;
iface_params->rx_allocator.sig_attr.offset =
worker->user_mem_allocator.offset;
}
}

/* Open UCT interface */
Expand Down Expand Up @@ -2394,10 +2401,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
goto err_free;
}

worker->user_mem_allocator.buffer_size =
params->user_allocator.buffer_size;
worker->user_mem_allocator.cb = params->user_allocator.cb;
worker->user_mem_allocator.arg = params->user_allocator.arg;
worker->user_mem_allocator = params->user_allocator;
}

/* Create statistics */
Expand Down
25 changes: 25 additions & 0 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,28 @@ struct uct_iface_attr {
};


/*
* @ingroup UCT_RESOURCE
* @RX signature offload attributes
*
* Attributes defining the way data provided by allocator will be
* used in signature offload.
*/

struct uct_sig_attr {
/* Payload block size to be covered by data integrity field
* Currently supported value 4048 */
size_t block;

/* Stride in buffer provided by memory allocator. Should be great or
* equal to block */
size_t stride;

/* Offset of first block in buffer provided by memory allocatore */
size_t offset;
};


/*
* @ingroup UCT_RESOURCE
* @RX buffers allocator obj
Expand All @@ -1099,6 +1121,9 @@ struct uct_rx_allocator {

/* User-defined argument for the allocator callback */
void *arg;

/* Signature offload attributes */
uct_sig_attr_t sig_attr;
};


Expand Down
1 change: 1 addition & 0 deletions src/uct/api/uct_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ typedef struct uct_tag_context uct_tag_context_t;
typedef uint64_t uct_tag_t; /* tag type - 64 bit */
typedef int uct_worker_cb_id_t;
typedef void* uct_conn_request_h;
typedef struct uct_sig_attr uct_sig_attr_t;
typedef struct uct_rx_allocator uct_rx_allocator_t;

/**
Expand Down
3 changes: 1 addition & 2 deletions src/uct/base/uct_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,7 @@ uct_base_iface_init_rx_buffers_allocator(uct_base_iface_t *iface,
}

iface->rx_allocator.user_allocator = 1;
iface->rx_allocator.allocator.cb = params->rx_allocator.cb;
iface->rx_allocator.allocator.arg = params->rx_allocator.arg;
iface->rx_allocator.allocator = params->rx_allocator;
} else {
/*
* We use this mpool config to prevent the default
Expand Down
33 changes: 20 additions & 13 deletions src/uct/ib/mlx5/dv/sig.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#define UCT_IB_MLX5_BSF_REPEAT_BLOCK UCS_BIT(7)
#define UCT_IB_MLX5_MKEY_BSF_EN UCS_BIT(30)
#define UCT_IB_MLX5_WQE_UMR_CTRL_MKEY_MASK_BSF_ENABLE UCS_BIT(12)

#define UCS_IB_MLX5_SIGNATURE_BLOCK_4048 4048

struct uct_ib_mlx5_bsf_inl {
uint16_t vld_refresh;
Expand Down Expand Up @@ -257,7 +257,8 @@ static void* uct_ib_mlx5_sig_umr_start(uct_ib_mlx5_txwq_t *txwq,
}

ucs_status_t uct_ib_mlx5_sig_mr_init(uct_ib_mlx5_md_t *md,
uct_ib_mlx5_mem_t *memh)
uct_ib_mlx5_mem_t *memh,
const uct_sig_attr_t *sig_attr)
{
uct_ib_mlx5_txwq_t *txwq = uct_ib_umr_get_txwq(md->umr);
uct_ib_mlx5_sig_t *sig;
Expand All @@ -266,17 +267,23 @@ ucs_status_t uct_ib_mlx5_sig_mr_init(uct_ib_mlx5_md_t *md,
size_t length;
void *wqe;

if (sig_attr->block != UCS_IB_MLX5_SIGNATURE_BLOCK_4048) {
ucs_error("unsuported signature block size: %zd", sig_attr->block);
return UCS_ERR_INVALID_PARAM;
}

sig = ucs_malloc(sizeof(*sig), "sig");
if (sig == NULL) {
ucs_error("Cannot allocate signature context");
return UCS_ERR_NO_MEMORY;
}

sig->mr = memh->mrs[UCT_IB_MR_DEFAULT].super.ib;
ucs_assert_always(!ucs_check_if_align_pow2((uintptr_t)sig->mr->addr,
UCT_IB_MLX5_T10DIF_BLOCK));
length = sig->mr->length;
num_elems = length / UCT_IB_MLX5_T10DIF_BLOCK;
sig->block = sig_attr->block;
sig->stride = sig_attr->stride;
sig->mr = memh->mrs[UCT_IB_MR_DEFAULT].super.ib;
sig->start = sig->mr->addr + sig_attr->offset;
length = sig->mr->length;
num_elems = length / sig->block;

sig->dif = ucs_mmap(NULL, num_elems * 8, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0, "sig dif");
Expand All @@ -297,15 +304,15 @@ ucs_status_t uct_ib_mlx5_sig_mr_init(uct_ib_mlx5_md_t *md,
goto err_dereg_dif;
}

wqe = uct_ib_mlx5_sig_umr_start(txwq, 4, 4, (UCT_IB_MLX5_T10DIF_BLOCK + 8) * num_elems);
wqe = uct_ib_mlx5_sig_umr_start(txwq, 4, 4, (sig->block + 8) * num_elems);
wqe = uct_ib_mlx5_txwq_wrap_exact(txwq, wqe);
wqe = uct_ib_mlx5_sig_set_stride_ctrl(wqe, num_elems, UCT_IB_MLX5_T10DIF_BLOCK + 8, 2);
wqe = uct_ib_mlx5_sig_set_stride(wqe, sig->mr->lkey, UCT_IB_MLX5_T10DIF_BLOCK,
UCT_IB_MLX5_T10DIF_STRIDE, sig->mr->addr);
wqe = uct_ib_mlx5_sig_set_stride_ctrl(wqe, num_elems, sig->block + 8, 2);
wqe = uct_ib_mlx5_sig_set_stride(wqe, sig->mr->lkey, sig->block,
sig->stride, sig->start);
wqe = uct_ib_mlx5_sig_set_stride(wqe, sig->dif_mr->lkey, 8, 8, sig->dif);
wqe = uct_ib_mlx5_sig_umr_round_bb(wqe);
wqe = uct_ib_mlx5_txwq_wrap_exact(txwq, wqe);
wqe = uct_ib_mlx5_sig_set_bsf(wqe, UCT_IB_MLX5_T10DIF_BLOCK * num_elems, md->psv.idx);
wqe = uct_ib_mlx5_sig_set_bsf(wqe, sig->block * num_elems, md->psv.idx);

status = uct_ib_umr_post(md->umr, wqe, htobe32(sig->sig_key));
if (status != UCS_OK) {
Expand Down Expand Up @@ -336,7 +343,7 @@ ucs_status_t uct_ib_mlx5_sig_mr_cleanup(uct_ib_mlx5_mem_t *memh)
int ret;

length = sig->mr->length;
num_elems = length / UCT_IB_MLX5_T10DIF_BLOCK;
num_elems = length / sig->block;

ret = mlx5dv_devx_obj_destroy(sig->sig_mr);
if (ret != 0) {
Expand Down
Loading

0 comments on commit 13003a5

Please sign in to comment.