Skip to content

Commit

Permalink
anomaly benchmark working with digital twin
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Mar 4, 2024
1 parent ebde170 commit 9f8555c
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/include/woofc-access.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ int WooFMsgRepair(char* woof_name, Dlist* holes);
/*
* 2 minute timeout
*/
// #define WOOF_MSG_REQ_TIMEOUT (120000)
#define WOOF_MSG_REQ_TIMEOUT (120000)
//#define WOOF_MSG_REQ_TIMEOUT (500)
#define WOOF_MSG_REQ_TIMEOUT (10000)
//#define WOOF_MSG_REQ_TIMEOUT (10000)

#define WOOF_MSG_THREADS (15)

Expand Down
14 changes: 14 additions & 0 deletions src/net/zmq/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,36 @@ per_endpoint_data* backend::get_local_socket_for(const std::string& endpoint) {
ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) {
auto ep_data = get_local_socket_for(endpoint);
if (!ep_data) {
printf("ServerRequest: failed to create endpoint for %s\n",endpoint);
fflush(stdout);
return nullptr;
}

auto sent = Send(std::move(msg), *ep_data->server);

if (!sent) {
DEBUG_WARN("ServerRequest: msg send to %s failed\n", endpoint);
printf("ServerRequest: msg send to %s failed\n", endpoint);
fflush(stdout);
return nullptr;
}

#undef WOOF_MSG_REQ_TIMEOUT
#define WOOF_MSG_REQ_TIMEOUT (120000)
auto server_resp = static_cast<zsock_t*>(zpoller_wait(ep_data->resp_poll.get(), WOOF_MSG_REQ_TIMEOUT));
if (!server_resp) {
if (zpoller_expired(ep_data->resp_poll.get())) {
DEBUG_WARN("ServerRequest: msg recv timeout from %s after %d msec\n", endpoint, WOOF_MSG_REQ_TIMEOUT);
printf("ServerRequest: msg recv timeout from %s after %d msec\n", endpoint, WOOF_MSG_REQ_TIMEOUT);
fflush(stdout);
} else if (zpoller_terminated(ep_data->resp_poll.get())) {
DEBUG_WARN("ServerRequest: msg recv interrupted from %s\n", endpoint);
printf("ServerRequest: msg recv interrupted from %s\n", endpoint);
fflush(stdout);
} else {
DEBUG_WARN("ServerRequest: msg recv failed from %s\n", endpoint);
printf("ServerRequest: msg recv interrupted from %s\n", endpoint);
fflush(stdout);
}
return nullptr;
}
Expand All @@ -53,6 +65,8 @@ ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) {

if (!resp) {
DEBUG_WARN("ServerRequest: msg recv from %s failed\n", endpoint);
printf("ServerRequest: msg recv from %s failed\n", endpoint);
fflush(stdout);
}

DEBUG_LOG("ServerRequest: completed successfully");
Expand Down
2 changes: 1 addition & 1 deletion src/net/zmq/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,4 @@ int32_t backend::remote_get_latest_seq_no(std::string_view woof_name,
auto& [str] = *res;
return std::stoul(str);
}
} // namespace cspot::zmq
} // namespace cspot::zmq
141 changes: 113 additions & 28 deletions src/woofc-mqtt-gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern "C" {
#include "woofc-access.h"
#include "woofc-mqtt.h"

#define WOOF_MQTT_MSG_THREADS (1)
#define WOOF_MQTT_MSG_THREADS (20)
#define WOOF_MQTT_MSG_REQ_TIMEOUT (10000)

#define IPLEN (17)
Expand All @@ -35,6 +35,78 @@ struct timespec ardino_delay = {1,0}; /* 1/3 second for lwip tcp memory leak */

#define DEFAULT_MQTT_TIMEOUT (3)


void FreeMsgList(char **msgs)
{
int i;
if(msgs == NULL) {
return;
}
i = 0;
while(msgs[i] != NULL) {
free(msgs[i]);
i++;
}
free(msgs);
return;
}

/*
* mosquitto could pack multiple messages into a single read()
*/
char **ExtractMsgList(char *msg)
{
char *curr;
char *next;
int count;
char **msgs;
char *cmsg;
int len;

/*
* count the new lines
*/
count = 0;
curr = msg;
while(*curr != 0) {
if(*curr == '\n') {
count++;
}
curr++;
}

/*
* need count+ messages and 1 extra for NULL
*/
msgs = (char **)malloc((count+2)*sizeof(char *));
if(msgs == NULL) {
printf("no space for message list\n");
return(NULL);
}
count = 0;
curr = msg;
next = strstr(curr,"\n");
while(next != NULL) {
len = next - curr + 1;
cmsg = (char *)malloc(len);
if(cmsg == NULL) {
printf("no space for cmsg\n");
FreeMsgList(msgs);
return(NULL);
}
memset(cmsg,0,len);
strncpy(cmsg,curr,len-1);
msgs[count] = cmsg;
printf("extracted msg[%d]: %s\n",count,cmsg);
curr = next+1;
count++;
next = strstr(curr,"\n");
}
msgs[count] = NULL;
return(msgs);
}


extern "C" {
static zmsg_t *WooFMQTTRequest(char *endpoint, zmsg_t *msg)
{
Expand Down Expand Up @@ -276,7 +348,7 @@ printf("WooFProcessPut: called on %s with size %d\n",woof_name,copy_size);
* use msgid to get back specific response
*/
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
Timeout,
Broker,
Device_name_space,
Expand All @@ -299,7 +371,7 @@ printf("sub_string: %s\n",sub_string);
if(hand_name[0] == 0) {
strncpy(hand_name,"NULL",sizeof(hand_name));
}
sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%s|%s\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%s|%s\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -452,7 +524,7 @@ void WooFProcessGetElSize(zmsg_t *req_msg, zsock_t *receiver)
* use msgid to get back specific response
*/
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
Timeout,
Broker,
Device_name_space,
Expand All @@ -470,7 +542,7 @@ printf("sub_string: %s\n",sub_string);
* create the mqtt message to put to the device
*/
memset(pub_string,0,sizeof(pub_string));
sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -531,7 +603,7 @@ printf("WooFProcessGetElSize received string: %s\n",resp_string);
}
memset(buffer, 0, sizeof(buffer));
sprintf(buffer, "%lu", el_size);
printf("WooFProcesGetElSize: sening back %s (%lu)\n",buffer,el_size);
printf("WooFProcesGetElSize: woof: %s sending back %s (%lu)\n",woof_name,buffer,el_size);
r_frame = zframe_new(buffer, strlen(buffer));
if (r_frame == NULL)
{
Expand Down Expand Up @@ -621,7 +693,7 @@ void WooFProcessGetLatestSeqno(zmsg_t *req_msg, zsock_t *receiver)
* use msgid to get back specific response
*/
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
Timeout,
Broker,
Device_name_space,
Expand All @@ -639,7 +711,7 @@ printf("sub_string: %s\n",sub_string);
* create the mqtt message to put to the device
*/
memset(pub_string,0,sizeof(pub_string));
sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -700,7 +772,7 @@ printf("WooFProcessGetLatestSeqno resp string: %s\n",resp_string);
}
memset(buffer, 0, sizeof(buffer));
sprintf(buffer, "%lu", latest_seq_no);
printf("WooFProcesGetLatestSeqno: sening back %s (%lu)\n",buffer,latest_seq_no);
printf("WooFProcesGetLatestSeqno: woof: %s sending back %s (%lu)\n",woof_name,buffer,latest_seq_no);
r_frame = zframe_new(buffer, strlen(buffer));
if (r_frame == NULL)
{
Expand Down Expand Up @@ -805,7 +877,7 @@ printf("WooFProcessGet: called on %s for seqno %lu\n",woof_name,seq_no);
*/
msgid = rand();
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'",
Timeout,
Broker,
Device_name_space,
Expand All @@ -825,7 +897,7 @@ printf("sub_string: %s\n",sub_string);
* request the Get
*/
memset(pub_string,0,sizeof(pub_string));
sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%d\'",
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%d\'",
Broker,
Device_name_space,
User_name,
Expand Down Expand Up @@ -1071,11 +1143,13 @@ void *WooFMsgThread(void *arg)
*/
msg = zmsg_recv(receiver);
}
printf("WooFMsgThread: STARTING OVER on NULL msg\n");

zsock_destroy(&receiver);
goto again;
pthread_exit(NULL);
}
}


int WooFMsgServer(char *wnamespace)
{
Expand Down Expand Up @@ -1661,6 +1735,8 @@ void *MQTTDeviceOutputThread(void *arg)
void *element_buff;
char *element_string;
int err;
char **msgs;
int i;

len = strlen((char *)arg) + 1;
device_name = (char *)malloc(len);
Expand Down Expand Up @@ -1689,7 +1765,7 @@ void *MQTTDeviceOutputThread(void *arg)
* no timeout here
*/
memset(sub_string,0,sizeof(sub_string));
sprintf(sub_string,"/usr/bin/mosquitto_sub -h %s -t %s.output -u \'%s\' -P \'%s\'",
sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -h %s -t %s.output -u \'%s\' -P \'%s\'",
Broker,
device_name,
User_name,
Expand All @@ -1703,18 +1779,23 @@ printf("sub_string: %s\n",sub_string);
if(size <= 0) {
break;
}
printf("input msg from DEVICE: %s\n",mqtt_msg);
wm = ParseMQTTString(mqtt_msg);
if(wm == NULL) {
fprintf(stderr,"MQTTDeviceOutputThread: couldn't parse %s\n",
mqtt_msg);
continue;
}
msgs = ExtractMsgList(mqtt_msg);
if(msgs != NULL) {
i = 0;
while(msgs[i] != NULL) {
printf("input msg from DEVICE: %s\n",msgs[i]);
wm = ParseMQTTString(msgs[i]);
if(wm == NULL) {
fprintf(stderr,"MQTTDeviceOutputThread: couldn't parse %s\n",
msgs[i]);
i++;
continue;
}
/*
* main processing dispatch
*/
memset(resp_string,0,WOOF_MQTT_MAX_SIZE);
switch(wm->command) {
memset(resp_string,0,WOOF_MQTT_MAX_SIZE);
switch(wm->command) {
case WOOF_MQTT_PUT:
seqno = WooFPut(wm->woof_name,
wm->handler_name,
Expand Down Expand Up @@ -1821,22 +1902,26 @@ printf("input msg from DEVICE: %s\n",mqtt_msg);
wm->msgid,
-1);
break;
}
}
//printf("output msg response to DEVICE: %s\n",resp_string);
/*
* send the respond back on the input channel
*/
memset(pub_string,0,sizeof(pub_string));
sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s\'",
memset(pub_string,0,sizeof(pub_string));
sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s\'",
Broker,
device_name,
User_name,
Password,
resp_string);
printf("output msg pub TO DEVICE %s\n",pub_string);
system(pub_string);
FreeWMQTT(wm);
wm = NULL;
system(pub_string);
FreeWMQTT(wm);
wm = NULL;
i++;
}
FreeMsgList(msgs);
}
}


Expand Down

0 comments on commit 9f8555c

Please sign in to comment.