From 347a86615ca763939ae5c133d96623cd8ee36d2c Mon Sep 17 00:00:00 2001 From: George Hotz Date: Fri, 22 Nov 2019 12:06:26 -0800 Subject: [PATCH] Switch from polling on FIFOs to signal (#12) * uids are pids * remove fifo, go for signal * that's more what i'm saying * oops, that's wrong * add some todos * make signal poll work * bring randomness back * fix case in msgq when subs get evicted while polling * check for ready messages before poll starts * No pr builds * use nanosleep with remainder * this should pass the test --- azure-pipelines.yml | 2 + messaging/msgq.cc | 155 +++++++++++---------------------- messaging/msgq.hpp | 7 -- messaging/tests/test_poller.py | 9 +- 4 files changed, 59 insertions(+), 114 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 613754aec1dcbf..c14526a5c1c404 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,3 +1,5 @@ +pr: none + pool: vmImage: 'ubuntu-16.04' diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 777f11a287cfce..54058d212f7a49 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -5,24 +5,35 @@ #include #include #include -#include #include #include - +#include +#include #include #include #include #include #include +#include #include #include - #include #include "msgq.hpp" +void sigusr1_handler(int signal) { + assert(signal == SIGUSR1); +} + +uint64_t msgq_get_uid(void){ + std::random_device rd("/dev/urandom"); + std::uniform_int_distribution distribution(0,std::numeric_limits::max()); + + uint64_t uid = distribution(rd) << 32 | syscall(SYS_gettid); + return uid; +} int msgq_msg_init_size(msgq_msg_t * msg, size_t size){ msg->size = size; @@ -69,6 +80,8 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes + std::signal(SIGUSR1, sigusr1_handler); + const char * prefix = "/dev/shm/"; char * full_path = new char[strlen(path) + strlen(prefix) + 1]; strcpy(full_path, prefix); @@ -114,23 +127,11 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ q->endpoint = path; q->read_conflate = false; - q->read_fifo = -1; return 0; } void msgq_close_queue(msgq_queue_t *q){ - if (q->read_fifo >= 0){ - close(q->read_fifo); - remove(q->read_fifo_path.c_str()); - } - - for (uint64_t i = 0; i < NUM_READERS; i++){ - if (q->read_fifos[i] >= 0){ - close(q->read_fifos[i]); - } - } - if (q->mmap_p != NULL){ munmap(q->mmap_p, q->size + sizeof(msgq_header_t)); } @@ -139,18 +140,13 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { std::cout << "Starting publisher" << std::endl; - - std::random_device rd("/dev/urandom"); - std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - uint64_t uid = distribution(rd); + uint64_t uid = msgq_get_uid(); *q->write_uid = uid; *q->num_readers = 0; for (size_t i = 0; i < NUM_READERS; i++){ *q->read_valids[i] = false; - q->read_fifos[i] = -1; - q->read_fifos_uid[i] = 0; *q->read_uids[i] = 0; } @@ -161,9 +157,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); - std::random_device rd("/dev/urandom"); - std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - uint64_t uid = distribution(rd); + uint64_t uid = msgq_get_uid(); // Get reader id while (true){ @@ -177,7 +171,12 @@ void msgq_init_subscriber(msgq_queue_t * q) { for (size_t i = 0; i < NUM_READERS; i++){ *q->read_valids[i] = false; + + uint64_t old_uid = *q->read_uids[i]; *q->read_uids[i] = 0; + + // Wake up reader in case they are in a poll + syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); } continue; @@ -200,26 +199,6 @@ void msgq_init_subscriber(msgq_queue_t * q) { } } - for (size_t i = 0; i < NUM_READERS; i++){ - q->read_fifos[i] = -1; - } - - q->read_fifo_path = "/dev/shm/fifo-"; - q->read_fifo_path += std::to_string(q->read_uid_local); - - std::cout << q->read_fifo_path << std::endl; - int r = mkfifo(q->read_fifo_path.c_str(), 0777); - if (r != 0) - perror("Fifo: "); - assert(r == 0); - - q->read_fifo = open(q->read_fifo_path.c_str(), O_RDWR | O_NONBLOCK); - - // Fysnc so the fifo shows up in the directory - auto shm_fd = open("/dev/shm", O_RDONLY); - fsync(shm_fd); - close(shm_fd); - std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; msgq_reset_reader(q); } @@ -231,7 +210,6 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ assert(q->write_uid_local == *q->write_uid); } - uint64_t total_msg_size = ALIGN(msg->size + sizeof(int64_t)); // We need to fit at least three messages in the queue, @@ -303,44 +281,12 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ for (uint64_t i = 0; i < num_readers; i++){ uint64_t reader_uid = *q->read_uids[i]; - // Open fifo when not set, or when reader changes - if (q->read_fifos[i] == -1 || q->read_fifos_uid[i] != reader_uid){ - // Close old reader fifo - if (q->read_fifos[i] >= 0){ - close(q->read_fifos[i]); - } - - q->read_fifos_uid[i] = reader_uid; - - std::string path = "/dev/shm/fifo-"; - path += std::to_string(reader_uid); - - q->read_fifos[i] = open(path.c_str(), O_RDWR | O_NONBLOCK); - if(q->read_fifos[i] < 0){ - std::cout << "Fifo: " << path << std::endl; - perror("Error opening fifo"); - } - } - - uint8_t m = 1; - write(q->read_fifos[i], &m, 1); + syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1); } return msg->size; } -int msgq_get_fd(msgq_queue_t * q){ - int id = q->reader_id; - assert(id >= 0); // Make sure subscriber is initialized - - if (q->read_uid_local != *q->read_uids[id]){ - std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl; - msgq_init_subscriber(q); - } - - return q->read_fifo; -} - int msgq_msg_ready(msgq_queue_t * q){ start: @@ -380,10 +326,6 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ goto start; } - // Read one byte from fifo - char buf[1]; - read(q->read_fifo, buf, 1); - // Check valid if (!*q->read_valids[id]){ msgq_reset_reader(q); @@ -465,34 +407,37 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ assert(timeout >= 0); int num = 0; - struct pollfd * fds = (struct pollfd *)calloc(nitems, sizeof(struct pollfd)); - - // Build poll structure - for (size_t i = 0; i < nitems; i++){ - fds[i].fd = msgq_get_fd(items[i].q); - fds[i].events = POLLIN; - - // Check if message is ready in case we get out of sync with the pipe - if (msgq_msg_ready(items[i].q)){ - items[i].revents = 1; - timeout = 0; // No timeout if a message is ready - num++; - } else { - items[i].revents = 0; - } + + // Check if messages ready + for (size_t i = 0; i < nitems; i++) { + items[i].revents = msgq_msg_ready(items[i].q); + if (items[i].revents) num++; } - poll(fds, nitems, timeout); + int ms = (timeout == -1) ? 100 : timeout; + struct timespec ts; + ts.tv_sec = ms / 1000; + ts.tv_nsec = (ms % 1000) * 1000 * 1000; + - // Read poll results - for (size_t i = 0; i < nitems; i++){ - if (fds[i].revents && !items[i].revents){ - // Don't add it if it was already added - num++; - items[i].revents = 1; + while (num == 0) { + int ret; + + ret = nanosleep(&ts, &ts); + + // Check if messages ready + for (size_t i = 0; i < nitems; i++) { + if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){ + num += 1; + items[i].revents = 1; + } + } + + // exit if we had a timeout and the sleep finished + if (timeout != -1 && ret == 0){ + break; } } - free(fds); return num; } diff --git a/messaging/msgq.hpp b/messaging/msgq.hpp index 116523e38d8250..3bead13d9d7e67 100644 --- a/messaging/msgq.hpp +++ b/messaging/msgq.hpp @@ -35,14 +35,7 @@ struct msgq_queue_t { uint64_t write_uid_local; bool read_conflate; - int read_fifo; - - // Fifo fds and corresponding reader uid - int read_fifos[NUM_READERS]; - uint64_t read_fifos_uid[NUM_READERS]; - std::string endpoint; - std::string read_fifo_path; }; struct msgq_msg_t { diff --git a/messaging/tests/test_poller.py b/messaging/tests/test_poller.py index 0f343fb89f0a66..facc74ef8973c9 100644 --- a/messaging/tests/test_poller.py +++ b/messaging/tests/test_poller.py @@ -15,7 +15,7 @@ def poller(): sub.connect(context, 'controlsState') p.registerSocket(sub) - socks = p.poll(1000) + socks = p.poll(10000) r = [s.receive(non_blocking=True) for s in socks] return r @@ -44,7 +44,6 @@ def test_poll_once(self): self.assertEqual(result, [b"a"]) - @unittest.skipIf(os.environ.get('MSGQ'), "fails under msgq") def test_poll_and_create_many_subscribers(self): context = messaging.Context() @@ -59,6 +58,8 @@ def test_poll_and_create_many_subscribers(self): for _ in range(10): messaging.SubSocket().connect(c, 'controlsState') + time.sleep(0.1) + # Send message pub.send("a") @@ -69,3 +70,7 @@ def test_poll_and_create_many_subscribers(self): context.term() self.assertEqual(result, [b"a"]) + + +if __name__ == "__main__": + unittest.main()