Skip to content

Commit

Permalink
MSGQ stability improvements when opening and closing lots of queues
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed Nov 15, 2019
1 parent e147abc commit ef64eb2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
2 changes: 2 additions & 0 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void MSGQSubSocket::setTimeout(int t){
}

MSGQSubSocket::~MSGQSubSocket(){
msgq_close_queue(q);
delete q;
}

Expand Down Expand Up @@ -149,6 +150,7 @@ int MSGQPubSocket::send(char *data, size_t size){
}

MSGQPubSocket::~MSGQPubSocket(){
msgq_close_queue(q);
delete q;
}

Expand Down
48 changes: 41 additions & 7 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,24 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
auto fd = open(full_path, O_RDWR | O_CREAT, 0777);
delete[] full_path;

assert(fd >= 0); // TODO: properly handle exit codes
if (fd < 0)
return -1;

int rc = ftruncate(fd, size + sizeof(msgq_header_t));
assert(rc == 0); // TODO: properly handle exit codes
if (rc < 0)
return -1;

char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);

assert(mem != NULL); // TODO: properly handle exit codes
if (mem == NULL)
return -1;

q->mmap_p = mem;

msgq_header_t *header = (msgq_header_t *)mem;

// Setup pointers to header segment
Expand All @@ -107,18 +114,34 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){

q->endpoint = path;
q->read_conflate = false;
q->read_fifo = -1;

return 0;
}

void msgq_close_queue(msgq_queue_t *q){
if (q->read_fifo >= 0){
close(q->read_fifo);
}

for (uint64_t i = 0; i < NUM_READERS; i++){
if (q->read_fifos[i] >= 0){
close(q->read_fifos[i]);
}
}

if (q->mmap_p != NULL){
munmap(q->mmap_p, q->size + sizeof(msgq_header_t));
}
}


void msgq_init_publisher(msgq_queue_t * q) {
std::cout << "Starting publisher" << std::endl;

std::random_device rd;
std::default_random_engine generator(rd());
std::random_device rd("/dev/urandom");
std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint64_t>::max());
uint64_t uid = distribution(generator);
uint64_t uid = distribution(rd);

*q->write_uid = uid;
*q->num_readers = 0;
Expand All @@ -134,10 +157,12 @@ void msgq_init_publisher(msgq_queue_t * q) {
}

void msgq_init_subscriber(msgq_queue_t * q) {
std::random_device rd;
std::default_random_engine generator(rd());
assert(q != NULL);
assert(q->num_readers != NULL);

std::random_device rd("/dev/urandom");
std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint64_t>::max());
uint64_t uid = distribution(generator);
uint64_t uid = distribution(rd);

// Get reader id
while (true){
Expand Down Expand Up @@ -179,12 +204,16 @@ void msgq_init_subscriber(msgq_queue_t * q) {

std::cout << q->read_fifo_path << std::endl;
int r = mkfifo(q->read_fifo_path.c_str(), 0777);
if (r != 0)
perror("Fifo: ");
assert(r == 0);

q->read_fifo = open(q->read_fifo_path.c_str(), O_RDWR | O_NONBLOCK);

// Fysnc so the fifo shows up in the directory
fsync(open("/dev/shm", O_RDONLY));
auto shm_fd = open("/dev/shm", O_RDONLY);
fsync(shm_fd);
close(shm_fd);

std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << std::endl;
msgq_reset_reader(q);
Expand Down Expand Up @@ -271,6 +300,11 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){

// Open fifo when not set, or when reader changes
if (q->read_fifos[i] == -1 || q->read_fifos_uid[i] != reader_uid){
// Close old reader fifo
if (q->read_fifos[i] >= 0){
close(q->read_fifos[i]);
}

q->read_fifos_uid[i] = reader_uid;

std::string path = "/dev/shm/fifo-";
Expand Down
2 changes: 2 additions & 0 deletions messaging/msgq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct msgq_queue_t {
std::atomic<uint64_t> *read_pointers[NUM_READERS];
std::atomic<uint64_t> *read_valids[NUM_READERS];
std::atomic<uint64_t> *read_uids[NUM_READERS];
char * mmap_p;
char * data;
size_t size;
int reader_id;
Expand Down Expand Up @@ -62,6 +63,7 @@ int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size);
int msgq_msg_close(msgq_msg_t *msg);

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size);
void msgq_close_queue(msgq_queue_t *q);
void msgq_init_publisher(msgq_queue_t * q);
void msgq_init_subscriber(msgq_queue_t * q);

Expand Down

0 comments on commit ef64eb2

Please sign in to comment.