Skip to content

Commit

Permalink
TEST/IODEMO: Avoid sending more messages after connection error
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe committed Apr 17, 2020
1 parent 14c63c5 commit 142bc92
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
14 changes: 8 additions & 6 deletions test/apps/iodemo/io_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class P2pDemoCommon : protected UcxContext {
free(_buffer);
}

virtual void operator()() {
virtual void operator()(ucs_status_t status) {
delete this;
}

Expand Down Expand Up @@ -124,8 +124,10 @@ class DemoServer : private P2pDemoCommon {
_server(server), _conn(conn), _sn(sn) {
}

virtual void operator()() {
_server->send_io_message(_conn, IO_COMP, _sn);
virtual void operator()(ucs_status_t status) {
if (status == UCS_OK) {
_server->send_io_message(_conn, IO_COMP, _sn);
}
delete this;
}

Expand Down Expand Up @@ -157,7 +159,7 @@ class DemoServer : private P2pDemoCommon {

void handle_io_read_request(UcxConnection* conn, const iomsg_hdr_t *hdr) {
// send data
assert(opts().data_size == hdr->data_size);
assert(opts().data_size >= hdr->data_size);
conn->send_data(buffer(), hdr->data_size, hdr->sn);

// send response as data
Expand All @@ -168,7 +170,7 @@ class DemoServer : private P2pDemoCommon {
}

void handle_io_write_request(UcxConnection* conn, const iomsg_hdr_t *hdr) {
assert(opts().data_size == hdr->data_size);
assert(opts().data_size >= hdr->data_size);
conn->recv_data(buffer(), hdr->data_size, hdr->sn,
new IoWriteResponseCallback(this, conn, hdr->sn));
}
Expand Down Expand Up @@ -201,7 +203,7 @@ class DemoClient : private P2pDemoCommon {
free(_buffer);
}

virtual void operator()() {
virtual void operator()(ucs_status_t status) {
/* wait data and response completion */
if (++_counter < 2) {
return;
Expand Down
22 changes: 15 additions & 7 deletions test/apps/iodemo/ucx_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct UcxError : public std::exception {
ucs_status_t _status;
};

void UcxCallback::operator()()
void UcxCallback::operator()(ucs_status_t status)
{
}

Expand Down Expand Up @@ -513,6 +513,10 @@ void UcxConnection::ep_close(enum ucp_ep_close_mode mode)
void UcxConnection::send_common(const void *buffer, size_t length, ucp_tag_t tag,
UcxCallback* callback)
{
if (_ep == NULL) {
return;
}

ucs_status_ptr_t ptr_status = ucp_tag_send_nb(_ep, buffer, length,
ucp_dt_make_contig(1), tag,
common_request_callback);
Expand All @@ -533,7 +537,7 @@ void UcxConnection::common_request_callback(void *request, ucs_status_t status)
ucx_request *r = reinterpret_cast<ucx_request*>(request);
if (r->callback) {
// already processed by send function
(*r->callback)();
(*r->callback)(status);
UcxContext::request_release(r);
} else {
// not yet processed by send function
Expand All @@ -552,18 +556,22 @@ void UcxConnection::process_request(const char *what,
ucs_status_ptr_t ptr_status,
UcxCallback* callback)
{
ucs_status_t status;

if (ptr_status == NULL) {
(*callback)();
(*callback)(UCS_OK);
} else if (UCS_PTR_IS_ERR(ptr_status)) {
log() << what << "failed with status"
<< ucs_status_string(UCS_PTR_STATUS(ptr_status)) << std::endl;
throw UcxError(what, UCS_PTR_STATUS(ptr_status));
status = UCS_PTR_STATUS(ptr_status);
log() << what << "failed with status" << ucs_status_string(status)
<< std::endl;
(*callback)(status);
throw UcxError(what, status);
} else {
// pointer to request
ucx_request *r = reinterpret_cast<ucx_request*>(ptr_status);
if (r->completed) {
// already completed by callback
(*callback)();
(*callback)(r->status);
UcxContext::request_release(r);
} else {
// will be completed by callback
Expand Down
2 changes: 1 addition & 1 deletion test/apps/iodemo/ucx_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class verbose_ostream {
class UcxCallback {
public:
virtual ~UcxCallback();
virtual void operator()();
virtual void operator()(ucs_status_t status);
};


Expand Down

0 comments on commit 142bc92

Please sign in to comment.