From 9f8555c01e17aee1cd701736aa98d255356a5aa9 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Mar 2024 10:55:40 -0800 Subject: [PATCH] anomaly benchmark working with digital twin --- src/include/woofc-access.h | 4 +- src/net/zmq/backend.cpp | 14 ++++ src/net/zmq/client.cpp | 2 +- src/woofc-mqtt-gateway.cpp | 141 +++++++++++++++++++++++++++++-------- 4 files changed, 130 insertions(+), 31 deletions(-) diff --git a/src/include/woofc-access.h b/src/include/woofc-access.h index d90c1b24..d149952d 100644 --- a/src/include/woofc-access.h +++ b/src/include/woofc-access.h @@ -39,9 +39,9 @@ int WooFMsgRepair(char* woof_name, Dlist* holes); /* * 2 minute timeout */ -// #define WOOF_MSG_REQ_TIMEOUT (120000) +#define WOOF_MSG_REQ_TIMEOUT (120000) //#define WOOF_MSG_REQ_TIMEOUT (500) -#define WOOF_MSG_REQ_TIMEOUT (10000) +//#define WOOF_MSG_REQ_TIMEOUT (10000) #define WOOF_MSG_THREADS (15) diff --git a/src/net/zmq/backend.cpp b/src/net/zmq/backend.cpp index 345d4f64..4110c0a8 100644 --- a/src/net/zmq/backend.cpp +++ b/src/net/zmq/backend.cpp @@ -27,6 +27,8 @@ per_endpoint_data* backend::get_local_socket_for(const std::string& endpoint) { ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) { auto ep_data = get_local_socket_for(endpoint); if (!ep_data) { + printf("ServerRequest: failed to create endpoint for %s\n",endpoint); + fflush(stdout); return nullptr; } @@ -34,17 +36,27 @@ ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) { if (!sent) { DEBUG_WARN("ServerRequest: msg send to %s failed\n", endpoint); + printf("ServerRequest: msg send to %s failed\n", endpoint); + fflush(stdout); return nullptr; } +#undef WOOF_MSG_REQ_TIMEOUT +#define WOOF_MSG_REQ_TIMEOUT (120000) auto server_resp = static_cast(zpoller_wait(ep_data->resp_poll.get(), WOOF_MSG_REQ_TIMEOUT)); if (!server_resp) { if (zpoller_expired(ep_data->resp_poll.get())) { DEBUG_WARN("ServerRequest: msg recv timeout from %s after %d msec\n", endpoint, WOOF_MSG_REQ_TIMEOUT); + printf("ServerRequest: msg recv timeout from %s after %d msec\n", endpoint, WOOF_MSG_REQ_TIMEOUT); + fflush(stdout); } else if (zpoller_terminated(ep_data->resp_poll.get())) { DEBUG_WARN("ServerRequest: msg recv interrupted from %s\n", endpoint); + printf("ServerRequest: msg recv interrupted from %s\n", endpoint); + fflush(stdout); } else { DEBUG_WARN("ServerRequest: msg recv failed from %s\n", endpoint); + printf("ServerRequest: msg recv interrupted from %s\n", endpoint); + fflush(stdout); } return nullptr; } @@ -53,6 +65,8 @@ ZMsgPtr backend::ServerRequest(const char* endpoint, ZMsgPtr msg) { if (!resp) { DEBUG_WARN("ServerRequest: msg recv from %s failed\n", endpoint); + printf("ServerRequest: msg recv from %s failed\n", endpoint); + fflush(stdout); } DEBUG_LOG("ServerRequest: completed successfully"); diff --git a/src/net/zmq/client.cpp b/src/net/zmq/client.cpp index 7b82b0e4..0d4b4efc 100644 --- a/src/net/zmq/client.cpp +++ b/src/net/zmq/client.cpp @@ -231,4 +231,4 @@ int32_t backend::remote_get_latest_seq_no(std::string_view woof_name, auto& [str] = *res; return std::stoul(str); } -} // namespace cspot::zmq \ No newline at end of file +} // namespace cspot::zmq diff --git a/src/woofc-mqtt-gateway.cpp b/src/woofc-mqtt-gateway.cpp index 37eb53c8..c0082871 100644 --- a/src/woofc-mqtt-gateway.cpp +++ b/src/woofc-mqtt-gateway.cpp @@ -19,7 +19,7 @@ extern "C" { #include "woofc-access.h" #include "woofc-mqtt.h" -#define WOOF_MQTT_MSG_THREADS (1) +#define WOOF_MQTT_MSG_THREADS (20) #define WOOF_MQTT_MSG_REQ_TIMEOUT (10000) #define IPLEN (17) @@ -35,6 +35,78 @@ struct timespec ardino_delay = {1,0}; /* 1/3 second for lwip tcp memory leak */ #define DEFAULT_MQTT_TIMEOUT (3) + +void FreeMsgList(char **msgs) +{ + int i; + if(msgs == NULL) { + return; + } + i = 0; + while(msgs[i] != NULL) { + free(msgs[i]); + i++; + } + free(msgs); + return; +} + +/* + * mosquitto could pack multiple messages into a single read() + */ +char **ExtractMsgList(char *msg) +{ + char *curr; + char *next; + int count; + char **msgs; + char *cmsg; + int len; + + /* + * count the new lines + */ + count = 0; + curr = msg; + while(*curr != 0) { + if(*curr == '\n') { + count++; + } + curr++; + } + + /* + * need count+ messages and 1 extra for NULL + */ + msgs = (char **)malloc((count+2)*sizeof(char *)); + if(msgs == NULL) { + printf("no space for message list\n"); + return(NULL); + } + count = 0; + curr = msg; + next = strstr(curr,"\n"); + while(next != NULL) { + len = next - curr + 1; + cmsg = (char *)malloc(len); + if(cmsg == NULL) { + printf("no space for cmsg\n"); + FreeMsgList(msgs); + return(NULL); + } + memset(cmsg,0,len); + strncpy(cmsg,curr,len-1); + msgs[count] = cmsg; +printf("extracted msg[%d]: %s\n",count,cmsg); + curr = next+1; + count++; + next = strstr(curr,"\n"); + } + msgs[count] = NULL; + return(msgs); +} + + extern "C" { static zmsg_t *WooFMQTTRequest(char *endpoint, zmsg_t *msg) { @@ -276,7 +348,7 @@ printf("WooFProcessPut: called on %s with size %d\n",woof_name,copy_size); * use msgid to get back specific response */ memset(sub_string,0,sizeof(sub_string)); - sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", + sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", Timeout, Broker, Device_name_space, @@ -299,7 +371,7 @@ printf("sub_string: %s\n",sub_string); if(hand_name[0] == 0) { strncpy(hand_name,"NULL",sizeof(hand_name)); } - sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%s|%s\'", + sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%s|%s\'", Broker, Device_name_space, User_name, @@ -452,7 +524,7 @@ void WooFProcessGetElSize(zmsg_t *req_msg, zsock_t *receiver) * use msgid to get back specific response */ memset(sub_string,0,sizeof(sub_string)); - sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", + sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", Timeout, Broker, Device_name_space, @@ -470,7 +542,7 @@ printf("sub_string: %s\n",sub_string); * create the mqtt message to put to the device */ memset(pub_string,0,sizeof(pub_string)); - sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'", + sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'", Broker, Device_name_space, User_name, @@ -531,7 +603,7 @@ printf("WooFProcessGetElSize received string: %s\n",resp_string); } memset(buffer, 0, sizeof(buffer)); sprintf(buffer, "%lu", el_size); -printf("WooFProcesGetElSize: sening back %s (%lu)\n",buffer,el_size); +printf("WooFProcesGetElSize: woof: %s sending back %s (%lu)\n",woof_name,buffer,el_size); r_frame = zframe_new(buffer, strlen(buffer)); if (r_frame == NULL) { @@ -621,7 +693,7 @@ void WooFProcessGetLatestSeqno(zmsg_t *req_msg, zsock_t *receiver) * use msgid to get back specific response */ memset(sub_string,0,sizeof(sub_string)); - sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", + sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", Timeout, Broker, Device_name_space, @@ -639,7 +711,7 @@ printf("sub_string: %s\n",sub_string); * create the mqtt message to put to the device */ memset(pub_string,0,sizeof(pub_string)); - sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'", + sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d\'", Broker, Device_name_space, User_name, @@ -700,7 +772,7 @@ printf("WooFProcessGetLatestSeqno resp string: %s\n",resp_string); } memset(buffer, 0, sizeof(buffer)); sprintf(buffer, "%lu", latest_seq_no); -printf("WooFProcesGetLatestSeqno: sening back %s (%lu)\n",buffer,latest_seq_no); +printf("WooFProcesGetLatestSeqno: woof: %s sending back %s (%lu)\n",woof_name,buffer,latest_seq_no); r_frame = zframe_new(buffer, strlen(buffer)); if (r_frame == NULL) { @@ -805,7 +877,7 @@ printf("WooFProcessGet: called on %s for seqno %lu\n",woof_name,seq_no); */ msgid = rand(); memset(sub_string,0,sizeof(sub_string)); - sprintf(sub_string,"/usr/bin/mosquitto_sub -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", + sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -W %d -C 1 -h %s -t %s.%d -u \'%s\' -P \'%s\'", Timeout, Broker, Device_name_space, @@ -825,7 +897,7 @@ printf("sub_string: %s\n",sub_string); * request the Get */ memset(pub_string,0,sizeof(pub_string)); - sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%d\'", + sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s|%d|%d|%d\'", Broker, Device_name_space, User_name, @@ -1071,11 +1143,13 @@ void *WooFMsgThread(void *arg) */ msg = zmsg_recv(receiver); } +printf("WooFMsgThread: STARTING OVER on NULL msg\n"); zsock_destroy(&receiver); goto again; pthread_exit(NULL); -} +} + int WooFMsgServer(char *wnamespace) { @@ -1661,6 +1735,8 @@ void *MQTTDeviceOutputThread(void *arg) void *element_buff; char *element_string; int err; + char **msgs; + int i; len = strlen((char *)arg) + 1; device_name = (char *)malloc(len); @@ -1689,7 +1765,7 @@ void *MQTTDeviceOutputThread(void *arg) * no timeout here */ memset(sub_string,0,sizeof(sub_string)); - sprintf(sub_string,"/usr/bin/mosquitto_sub -h %s -t %s.output -u \'%s\' -P \'%s\'", + sprintf(sub_string,"/usr/bin/mosquitto_sub -q 1 -h %s -t %s.output -u \'%s\' -P \'%s\'", Broker, device_name, User_name, @@ -1703,18 +1779,23 @@ printf("sub_string: %s\n",sub_string); if(size <= 0) { break; } -printf("input msg from DEVICE: %s\n",mqtt_msg); - wm = ParseMQTTString(mqtt_msg); - if(wm == NULL) { - fprintf(stderr,"MQTTDeviceOutputThread: couldn't parse %s\n", - mqtt_msg); - continue; - } + msgs = ExtractMsgList(mqtt_msg); + if(msgs != NULL) { + i = 0; + while(msgs[i] != NULL) { +printf("input msg from DEVICE: %s\n",msgs[i]); + wm = ParseMQTTString(msgs[i]); + if(wm == NULL) { + fprintf(stderr,"MQTTDeviceOutputThread: couldn't parse %s\n", + msgs[i]); + i++; + continue; + } /* * main processing dispatch */ - memset(resp_string,0,WOOF_MQTT_MAX_SIZE); - switch(wm->command) { + memset(resp_string,0,WOOF_MQTT_MAX_SIZE); + switch(wm->command) { case WOOF_MQTT_PUT: seqno = WooFPut(wm->woof_name, wm->handler_name, @@ -1821,22 +1902,26 @@ printf("input msg from DEVICE: %s\n",mqtt_msg); wm->msgid, -1); break; - } + } //printf("output msg response to DEVICE: %s\n",resp_string); /* * send the respond back on the input channel */ - memset(pub_string,0,sizeof(pub_string)); - sprintf(pub_string,"/usr/bin/mosquitto_pub -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s\'", + memset(pub_string,0,sizeof(pub_string)); + sprintf(pub_string,"/usr/bin/mosquitto_pub -q 1 -h %s -t %s.input -u \'%s\' -P \'%s\' -m \'%s\'", Broker, device_name, User_name, Password, resp_string); printf("output msg pub TO DEVICE %s\n",pub_string); - system(pub_string); - FreeWMQTT(wm); - wm = NULL; + system(pub_string); + FreeWMQTT(wm); + wm = NULL; + i++; + } + FreeMsgList(msgs); + } }