Skip to content

Commit

Permalink
C++ implementation of SubMaster and PubMaster (#42)
Browse files Browse the repository at this point in the history
* add class SubMaster&PubMaster

* add socketmaster.cc

* move to cereal_objects

* modify SubMaster

* modify SubMaster

* Code streamlining

* m createSocket

remove dup SubMessage

* Streamline code

Streamline code

Streamline code

alive if delay with LogMonoTime x

reduce code lines

add underscore to class variable,dont check getValid() in poll

add static function get_service(), assert if pub name is not in service list

remove paramater alive from recieve()

rename services to endpoints

is alive is freq_ <= (1e-5)

referer

use sockets_.find() in pushMaster::send to avoid create an empty entry

submaster only do poller, rm PubMaster,add PubMessage

add MessageContext

make SubMaster api same as python version

add class PubMessage

add empty line

fix type

fix typo

remove service_list_

Streamline code

 use cached heaparry to build&send msg

reduce lines

rename sockets_ to messages_

initialize member variable in class

don't use std::find

fix typo

* use global context,remove class PubMessage

remove SubMessage interface from header,add operator[]

undo format source

add new line at end of file

add missing header fiel

reve std::find

add empty line

better drain
  • Loading branch information
deanlee authored May 21, 2020
1 parent c1a6d75 commit 30838d4
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 1 deletion.
1 change: 1 addition & 0 deletions SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ if shutil.which('capnpc-java'):
cereal_objects = env.SharedObject([
'gen/cpp/car.capnp.c++',
'gen/cpp/log.capnp.c++',
'messaging/socketmaster.cc',
])

env.Library('cereal', cereal_objects)
Expand Down
39 changes: 38 additions & 1 deletion messaging/messaging.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once
#include <cstddef>
#include <vector>
#include <map>
#include <string>
#include <vector>
#include <capnp/serialize.h>
#include "../gen/cpp/log.capnp.h"

#define MSG_MULTIPLE_PUBLISHERS 100

Expand Down Expand Up @@ -54,3 +57,37 @@ class Poller {
static Poller * create(std::vector<SubSocket*> sockets);
virtual ~Poller(){};
};

class SubMaster {
public:
SubMaster(const std::initializer_list<const char *> &service_list,
const char *address = nullptr, bool conflate = false, const std::initializer_list<const char *> &ignore_alive = {});
int update(int timeout = 1000);
inline bool allAlive(const std::initializer_list<const char *> &service_list = {}) { return all_(service_list, false, true); }
inline bool allValid(const std::initializer_list<const char *> &service_list = {}) { return all_(service_list, true, false); }
inline bool allAliveAndValid(const std::initializer_list<const char *> &service_list = {}) { return all_(service_list, true, true); }
bool updated(const char *name) const;
void drain();
cereal::Event::Reader &operator[](const char *name);
~SubMaster();

private:
bool all_(const std::initializer_list<const char *> &service_list, bool valid, bool alive);
Poller *poller_ = nullptr;
uint64_t frame_ = 0;
struct SubMessage;
std::map<SubSocket *, SubMessage *> messages_;
std::map<std::string, SubMessage *> services_;
};

class PubMaster {
public:
PubMaster(const std::initializer_list<const char *> &service_list);
inline int send(const char *name, capnp::byte *data, size_t size) { return sockets_.at(name)->send((char *)data, size); }
int send(const char *name, capnp::MessageBuilder &msg);
~PubMaster();

private:
std::map<std::string, PubSocket *> sockets_;
};

162 changes: 162 additions & 0 deletions messaging/socketmaster.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#include <assert.h>
#include <time.h>
#include "messaging.hpp"
#include "services.h"
#ifdef __APPLE__
#define CLOCK_BOOTTIME CLOCK_MONOTONIC
#endif
static inline uint64_t nanos_since_boot() {
struct timespec t;
clock_gettime(CLOCK_BOOTTIME, &t);
return t.tv_sec * 1000000000ULL + t.tv_nsec;
}
static const service *get_service(const char *name) {
for (const auto &it : services) {
if (strcmp(it.name, name) == 0) return &it;
}
return nullptr;
}
static inline bool inList(const std::initializer_list<const char *> &list, const char *value) {
for (auto &v : list) {
if (strcmp(value, v) == 0) return true;
}
return false;
}

class MessageContext {
public:
MessageContext() { ctx_ = Context::create(); }
~MessageContext() { delete ctx_; }
Context *ctx_;
};
MessageContext ctx;

struct SubMaster::SubMessage {
std::string name;
SubSocket *socket = nullptr;
int freq = 0;
bool updated = false, alive = false, valid = false, ignore_alive;
uint64_t rcv_time = 0, rcv_frame = 0;
void *allocated_msg_reader = nullptr;
capnp::FlatArrayMessageReader *msg_reader = nullptr;
kj::Array<capnp::word> buf;
cereal::Event::Reader event;
};

SubMaster::SubMaster(const std::initializer_list<const char *> &service_list, const char *address,
bool conflate, const std::initializer_list<const char *> &ignore_alive) {
poller_ = Poller::create();
for (auto name : service_list) {
const service *serv = get_service(name);
assert(serv != nullptr);
SubSocket *socket = SubSocket::create(ctx.ctx_, name, address ? address : "127.0.0.1", conflate);
assert(socket != 0);
poller_->registerSocket(socket);
SubMessage *m = new SubMessage{
.socket = socket,
.freq = serv->frequency,
.ignore_alive = inList(ignore_alive, name),
.allocated_msg_reader = malloc(sizeof(capnp::FlatArrayMessageReader)),
.buf = kj::heapArray<capnp::word>(1024)};
messages_[socket] = m;
services_[name] = m;
}
}

int SubMaster::update(int timeout) {
if (++frame_ == UINT64_MAX) frame_ = 1;
for (auto &kv : messages_) kv.second->updated = false;

int updated = 0;
auto sockets = poller_->poll(timeout);
uint64_t current_time = nanos_since_boot();
for (auto s : sockets) {
Message *msg = s->receive(true);
if (msg == nullptr) continue;

SubMessage *m = messages_.at(s);
const size_t size = (msg->getSize() / sizeof(capnp::word)) + 1;
if (m->buf.size() < size) {
m->buf = kj::heapArray<capnp::word>(size);
}
memcpy(m->buf.begin(), msg->getData(), msg->getSize());
delete msg;

if (m->msg_reader) {
m->msg_reader->~FlatArrayMessageReader();
}
m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(kj::ArrayPtr<capnp::word>(m->buf.begin(), size));
m->event = m->msg_reader->getRoot<cereal::Event>();
m->updated = true;
m->rcv_time = current_time;
m->rcv_frame = frame_;
m->valid = m->event.getValid();

++updated;
}

for (auto &kv : messages_) {
SubMessage *m = kv.second;
m->alive = (m->freq <= (1e-5) || ((current_time - m->rcv_time) * (1e-9)) < (10.0 / m->freq));
}
return updated;
}

bool SubMaster::all_(const std::initializer_list<const char *> &service_list, bool valid, bool alive) {
int found = 0;
for (auto &kv : messages_) {
SubMessage *m = kv.second;
if (service_list.size() == 0 || inList(service_list, m->name.c_str())) {
found += (!valid || m->valid) && (!alive || (m->alive && !m->ignore_alive));
}
}
return service_list.size() == 0 ? found == messages_.size() : found == service_list.size();
}

void SubMaster::drain() {
while (true) {
auto polls = poller_->poll(0);
if (polls.size() == 0)
break;

for (auto sock : polls) {
Message *msg = sock->receive(true);
delete msg;
}
}
}

bool SubMaster::updated(const char *name) const { return services_.at(name)->updated; }
cereal::Event::Reader &SubMaster::operator[](const char *name) { return services_.at(name)->event; };

SubMaster::~SubMaster() {
delete poller_;
for (auto &kv : messages_) {
SubMessage *m = kv.second;
if (m->msg_reader) {
m->msg_reader->~FlatArrayMessageReader();
}
free(m->allocated_msg_reader);
delete m->socket;
delete m;
}
}

PubMaster::PubMaster(const std::initializer_list<const char *> &service_list) {
for (auto name : service_list) {
assert(get_service(name) != nullptr);
PubSocket *socket = PubSocket::create(ctx.ctx_, name);
assert(socket);
sockets_[name] = socket;
}
}

int PubMaster::send(const char *name, capnp::MessageBuilder &msg) {
auto words = capnp::messageToFlatArray(msg);
auto bytes = words.asBytes();
return send(name, bytes.begin(), bytes.size());
}

PubMaster::~PubMaster() {
for (auto s : sockets_) delete s.second;
}

0 comments on commit 30838d4

Please sign in to comment.