forked from emmertex/openpilot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
C++ implementation of SubMaster and PubMaster (emmertex#42)
* add class SubMaster&PubMaster * add socketmaster.cc * move to cereal_objects * modify SubMaster * modify SubMaster * Code streamlining * m createSocket remove dup SubMessage * Streamline code Streamline code Streamline code alive if delay with LogMonoTime x reduce code lines add underscore to class variable,dont check getValid() in poll add static function get_service(), assert if pub name is not in service list remove paramater alive from recieve() rename services to endpoints is alive is freq_ <= (1e-5) referer use sockets_.find() in pushMaster::send to avoid create an empty entry submaster only do poller, rm PubMaster,add PubMessage add MessageContext make SubMaster api same as python version add class PubMessage add empty line fix type fix typo remove service_list_ Streamline code use cached heaparry to build&send msg reduce lines rename sockets_ to messages_ initialize member variable in class don't use std::find fix typo * use global context,remove class PubMessage remove SubMessage interface from header,add operator[] undo format source add new line at end of file add missing header fiel reve std::find add empty line better drain
- Loading branch information
Showing
3 changed files
with
201 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
#include <assert.h> | ||
#include <time.h> | ||
#include "messaging.hpp" | ||
#include "services.h" | ||
#ifdef __APPLE__ | ||
#define CLOCK_BOOTTIME CLOCK_MONOTONIC | ||
#endif | ||
static inline uint64_t nanos_since_boot() { | ||
struct timespec t; | ||
clock_gettime(CLOCK_BOOTTIME, &t); | ||
return t.tv_sec * 1000000000ULL + t.tv_nsec; | ||
} | ||
static const service *get_service(const char *name) { | ||
for (const auto &it : services) { | ||
if (strcmp(it.name, name) == 0) return ⁢ | ||
} | ||
return nullptr; | ||
} | ||
static inline bool inList(const std::initializer_list<const char *> &list, const char *value) { | ||
for (auto &v : list) { | ||
if (strcmp(value, v) == 0) return true; | ||
} | ||
return false; | ||
} | ||
|
||
class MessageContext { | ||
public: | ||
MessageContext() { ctx_ = Context::create(); } | ||
~MessageContext() { delete ctx_; } | ||
Context *ctx_; | ||
}; | ||
MessageContext ctx; | ||
|
||
struct SubMaster::SubMessage { | ||
std::string name; | ||
SubSocket *socket = nullptr; | ||
int freq = 0; | ||
bool updated = false, alive = false, valid = false, ignore_alive; | ||
uint64_t rcv_time = 0, rcv_frame = 0; | ||
void *allocated_msg_reader = nullptr; | ||
capnp::FlatArrayMessageReader *msg_reader = nullptr; | ||
kj::Array<capnp::word> buf; | ||
cereal::Event::Reader event; | ||
}; | ||
|
||
SubMaster::SubMaster(const std::initializer_list<const char *> &service_list, const char *address, | ||
bool conflate, const std::initializer_list<const char *> &ignore_alive) { | ||
poller_ = Poller::create(); | ||
for (auto name : service_list) { | ||
const service *serv = get_service(name); | ||
assert(serv != nullptr); | ||
SubSocket *socket = SubSocket::create(ctx.ctx_, name, address ? address : "127.0.0.1", conflate); | ||
assert(socket != 0); | ||
poller_->registerSocket(socket); | ||
SubMessage *m = new SubMessage{ | ||
.socket = socket, | ||
.freq = serv->frequency, | ||
.ignore_alive = inList(ignore_alive, name), | ||
.allocated_msg_reader = malloc(sizeof(capnp::FlatArrayMessageReader)), | ||
.buf = kj::heapArray<capnp::word>(1024)}; | ||
messages_[socket] = m; | ||
services_[name] = m; | ||
} | ||
} | ||
|
||
int SubMaster::update(int timeout) { | ||
if (++frame_ == UINT64_MAX) frame_ = 1; | ||
for (auto &kv : messages_) kv.second->updated = false; | ||
|
||
int updated = 0; | ||
auto sockets = poller_->poll(timeout); | ||
uint64_t current_time = nanos_since_boot(); | ||
for (auto s : sockets) { | ||
Message *msg = s->receive(true); | ||
if (msg == nullptr) continue; | ||
|
||
SubMessage *m = messages_.at(s); | ||
const size_t size = (msg->getSize() / sizeof(capnp::word)) + 1; | ||
if (m->buf.size() < size) { | ||
m->buf = kj::heapArray<capnp::word>(size); | ||
} | ||
memcpy(m->buf.begin(), msg->getData(), msg->getSize()); | ||
delete msg; | ||
|
||
if (m->msg_reader) { | ||
m->msg_reader->~FlatArrayMessageReader(); | ||
} | ||
m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(kj::ArrayPtr<capnp::word>(m->buf.begin(), size)); | ||
m->event = m->msg_reader->getRoot<cereal::Event>(); | ||
m->updated = true; | ||
m->rcv_time = current_time; | ||
m->rcv_frame = frame_; | ||
m->valid = m->event.getValid(); | ||
|
||
++updated; | ||
} | ||
|
||
for (auto &kv : messages_) { | ||
SubMessage *m = kv.second; | ||
m->alive = (m->freq <= (1e-5) || ((current_time - m->rcv_time) * (1e-9)) < (10.0 / m->freq)); | ||
} | ||
return updated; | ||
} | ||
|
||
bool SubMaster::all_(const std::initializer_list<const char *> &service_list, bool valid, bool alive) { | ||
int found = 0; | ||
for (auto &kv : messages_) { | ||
SubMessage *m = kv.second; | ||
if (service_list.size() == 0 || inList(service_list, m->name.c_str())) { | ||
found += (!valid || m->valid) && (!alive || (m->alive && !m->ignore_alive)); | ||
} | ||
} | ||
return service_list.size() == 0 ? found == messages_.size() : found == service_list.size(); | ||
} | ||
|
||
void SubMaster::drain() { | ||
while (true) { | ||
auto polls = poller_->poll(0); | ||
if (polls.size() == 0) | ||
break; | ||
|
||
for (auto sock : polls) { | ||
Message *msg = sock->receive(true); | ||
delete msg; | ||
} | ||
} | ||
} | ||
|
||
bool SubMaster::updated(const char *name) const { return services_.at(name)->updated; } | ||
cereal::Event::Reader &SubMaster::operator[](const char *name) { return services_.at(name)->event; }; | ||
|
||
SubMaster::~SubMaster() { | ||
delete poller_; | ||
for (auto &kv : messages_) { | ||
SubMessage *m = kv.second; | ||
if (m->msg_reader) { | ||
m->msg_reader->~FlatArrayMessageReader(); | ||
} | ||
free(m->allocated_msg_reader); | ||
delete m->socket; | ||
delete m; | ||
} | ||
} | ||
|
||
PubMaster::PubMaster(const std::initializer_list<const char *> &service_list) { | ||
for (auto name : service_list) { | ||
assert(get_service(name) != nullptr); | ||
PubSocket *socket = PubSocket::create(ctx.ctx_, name); | ||
assert(socket); | ||
sockets_[name] = socket; | ||
} | ||
} | ||
|
||
int PubMaster::send(const char *name, capnp::MessageBuilder &msg) { | ||
auto words = capnp::messageToFlatArray(msg); | ||
auto bytes = words.asBytes(); | ||
return send(name, bytes.begin(), bytes.size()); | ||
} | ||
|
||
PubMaster::~PubMaster() { | ||
for (auto s : sockets_) delete s.second; | ||
} |