Skip to content

Commit

Permalink
Merge branch 'master' of github.com:MAYHEM-Lab/cspot
Browse files Browse the repository at this point in the history
  • Loading branch information
ckrintz committed Apr 13, 2024
2 parents 0b4f84b + 7891c55 commit ae2c24b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/include/woofc-access.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ int WooFMsgRepair(char* woof_name, Dlist* holes);
*/
//#define WOOF_MSG_REQ_TIMEOUT (90000)
//#define WOOF_MSG_REQ_TIMEOUT (500)
#define WOOF_MSG_REQ_TIMEOUT (5000)
#define WOOF_MSG_REQ_TIMEOUT (3000)

#define WOOF_MSG_THREADS (15)

Expand Down
45 changes: 44 additions & 1 deletion src/net/zmq/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,33 @@ per_endpoint_data* backend::get_local_socket_for(const std::string& endpoint) {
return &it->second;
}

per_endpoint_data* backend::reset_local_socket_for(const std::string& endpoint) {

auto& map_for_thread = m_per_thread_socks[std::this_thread::get_id()];
auto it = map_for_thread.find(endpoint);
if (it == map_for_thread.end()) {
// Socket does not exist
auto ep_data = cspot::zmq::per_endpoint_data::create(endpoint);
if (!ep_data) {
return nullptr;
}
auto [i, ins] = map_for_thread.emplace(endpoint, std::move(*ep_data));
it = i;
return &it->second;
} else {
// doesn't work with cspot 2.0 but does with 1.0
map_for_thread.erase(endpoint); // should call destructor on server and poller
auto ep_data = cspot::zmq::per_endpoint_data::create(endpoint);
if (!ep_data) {
return nullptr;
}
auto [i, ins] = map_for_thread.emplace(endpoint, std::move(*ep_data));
it = i;
return &it->second;
}

}

ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) {
auto ep_data = get_local_socket_for(endpoint);
if (!ep_data) {
Expand All @@ -40,13 +67,29 @@ ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) {
return nullptr;
}

zmsg_t *dmsg = msg.get();
zmsg_t *dup_msg = zmsg_dup(dmsg);
auto sent = Send(std::move(msg), *ep_data->server);

if (!sent) {
DEBUG_WARN("ServerRequest: msg send to %s failed\n", endpoint);
printf("ServerRequest: msg send to %s failed\n", endpoint);
fflush(stdout);
return nullptr;
ep_data = reset_local_socket_for(endpoint);
if(!ep_data) {
printf("ServerRequest: failed to reset endpoint for %s\n",endpoint);
return nullptr;
} else {
printf("ServerRequest: attempting send with reset endpoint for %s\n",endpoint);
//sent = Send(std::move(msg), *ep_data->server);
int rc = zmsg_send(&dup_msg, ep_data->server.get());
if(rc != 0) {
printf("ServerRequest: send with reset failed to endpoint for %s\n",endpoint);
return nullptr;
}
}
} else {
zmsg_destroy(&dup_msg);
}

auto server_resp = static_cast<zsock_t*>(zpoller_wait(ep_data->resp_poll.get(), WOOF_MSG_REQ_TIMEOUT));
Expand Down
3 changes: 2 additions & 1 deletion src/net/zmq/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class backend : public network_backend {
private:
ZMsgPtr ServerRequest(const char* endpoint, ZMsgPtr msg_arg);
per_endpoint_data* get_local_socket_for(const std::string& endpoint);
per_endpoint_data* reset_local_socket_for(const std::string& endpoint);

using AddrSockMapT = std::unordered_map<std::string, per_endpoint_data>;

Expand All @@ -48,4 +49,4 @@ class backend : public network_backend {
ZActorPtr m_proxy;
std::atomic<bool> m_stop_called = false;
};
}
}
35 changes: 18 additions & 17 deletions src/woofc-mqtt-gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pthread_mutex_t ELock; // for endpoint cache
#include "woofc-mqtt.h"

#define WOOF_MQTT_MSG_THREADS (20)
#define WOOF_MQTT_THREADS (20)
#define WOOF_MQTT_MSG_REQ_TIMEOUT (10000)

void DeviceSend(char *s);
Expand All @@ -34,7 +35,7 @@ int Timeout;
struct timespec ardino_delay = {1,0}; /* 1/3 second for lwip tcp memory leak */
//#define PAUSE nanosleep(&ardino_delay,NULL)

#define DEFAULT_MQTT_TIMEOUT (3)
#define DEFAULT_MQTT_TIMEOUT (5)


double GetTime()
Expand Down Expand Up @@ -187,15 +188,15 @@ void *RespTimeoutThread(void *arg)
RB_FORWARD(RespList,rb) {
r = (RESP *)rb->value.v;
ts = GetTime();
if((ts - r->time) > *timeout) {
if((ts - r->time) > (*timeout/4)) {
r->abort = 1;
if(r->s != NULL) {
VSemPT(r->s);
}
}
}
pthread_mutex_unlock(&RLock);
sleep(*timeout);
sleep(*timeout/4);
}
pthread_exit(NULL);
}
Expand Down Expand Up @@ -665,7 +666,7 @@ printf("WooFProcessPut: called on %s with size %d\n",woof_name,copy_size);
*/
#ifdef FP
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 0 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
msgid,
Timeout,
Broker,
Expand Down Expand Up @@ -702,7 +703,7 @@ printf("sub_string: %s\n",sub_string);
strncpy(hand_name,"NULL",sizeof(hand_name));
}
#ifdef FP
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 0 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%s|%s\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%s|%s\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -877,7 +878,7 @@ void WooFProcessGetElSize(zmsg_t *req_msg, zsock_t *receiver)
*/
#ifdef FP
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 0 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
msgid,
Timeout,
Broker,
Expand Down Expand Up @@ -905,7 +906,7 @@ printf("sub_string: %s\n",sub_string);
*/
memset(pub_string,0,sizeof(pub_string));
#ifdef FP
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 0 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -1078,7 +1079,7 @@ void WooFProcessGetLatestSeqno(zmsg_t *req_msg, zsock_t *receiver)
*/
#ifdef FP
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 0 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
msgid,
Timeout,
Broker,
Expand Down Expand Up @@ -1107,7 +1108,7 @@ printf("sub_string: %s\n",sub_string);
*/
memset(pub_string,0,sizeof(pub_string));
#ifdef FP
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 0 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -1293,7 +1294,7 @@ printf("WooFProcessGet: called on %s for seqno %lu\n",woof_name,seq_no);
msgid = rand();
#ifdef FP
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 0 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -c -i %d -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
msgid,
Timeout,
Broker,
Expand Down Expand Up @@ -1325,7 +1326,7 @@ printf("sub_string: %s\n",sub_string);
*/
memset(pub_string,0,sizeof(pub_string));
#ifdef FP
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 0 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%d\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%d\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -1610,7 +1611,7 @@ int WooFMsgServerMQTT(const char *wnamespace)
zactor_t *proxy;
int err;
char endpoint[255];
pthread_t tids[WOOF_MSG_THREADS];
pthread_t tids[WOOF_MQTT_THREADS];
int i;


Expand Down Expand Up @@ -1674,7 +1675,7 @@ int WooFMsgServerMQTT(const char *wnamespace)
* so this can be increased if need be
*/
pthread_mutex_init(&ELock,NULL); // init global lock for thread map
for (i = 0; i < WOOF_MSG_THREADS; i++)
for (i = 0; i < WOOF_MQTT_THREADS; i++)
{
err = pthread_create(&tids[i], NULL, WooFMsgThread, NULL);
if (err < 0)
Expand All @@ -1688,7 +1689,7 @@ int WooFMsgServerMQTT(const char *wnamespace)
* right now, there is no way for these threads to exit so the msg server will block
* indefinitely in this join
*/
for (i = 0; i < WOOF_MSG_THREADS; i++)
for (i = 0; i < WOOF_MQTT_THREADS; i++)
{
pthread_join(tids[i], NULL);
}
Expand Down Expand Up @@ -2225,7 +2226,7 @@ void *MQTTDeviceOutputThread(void *arg)
* no timeout here
*/
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 0 -h %s -t %s.output -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -h %s -t %s.output -u \'%s\' -P \'%s\'",
Broker,
device_name,
User_name,
Expand Down Expand Up @@ -2399,7 +2400,7 @@ printf("ORIGIN msg id %d\n",msgid);
*/
memset(pub_string,0,sizeof(pub_string));
#ifdef FP
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 0 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s\'",
Broker,
device_name,
User_name,
Expand Down Expand Up @@ -2572,7 +2573,7 @@ int main(int argc, char **argv)
* create connection to broker for device input pubs
*/
memset(pub_string,0, sizeof(pub_string));
sprintf(pub_string,"/usr/bin/mosquitto_pub --stdin-line -q 0 -h %s -t %s.input -u \'%s\' -P \'%s\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub --stdin-line -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\'",
Broker,
Device_name_space,
User_name,
Expand Down

0 comments on commit ae2c24b

Please sign in to comment.