From df69e30b9ab86ed194347198a8c85baf5aaea475 Mon Sep 17 00:00:00 2001 From: David Beck Date: Sun, 18 Feb 2024 23:11:07 +0100 Subject: [PATCH] Add topic prefix in mqtt5 --- inc/finalmq/protocols/ProtocolMqtt5Client.h | 3 +++ src/protocols/ProtocolMqtt5Client.cpp | 30 +++++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/inc/finalmq/protocols/ProtocolMqtt5Client.h b/inc/finalmq/protocols/ProtocolMqtt5Client.h index a5b9c804..a713c85f 100644 --- a/inc/finalmq/protocols/ProtocolMqtt5Client.h +++ b/inc/finalmq/protocols/ProtocolMqtt5Client.h @@ -38,6 +38,7 @@ class SYMBOLEXP ProtocolMqtt5Client : public IProtocol, public IMqtt5ClientCallb static const std::string KEY_PASSWORD; ///< the password for the broker static const std::string KEY_SESSIONEXPIRYINTERVAL; ///< the mqtt session expiry interval in seconds static const std::string KEY_KEEPALIVE; ///< the mqtt keep alive interval in seconds + static const std::string KEY_TOPIC_PREFIX; ///< a topic prefix static const std::uint32_t PROTOCOL_ID; // 5 static const std::string PROTOCOL_NAME; // mqtt5client @@ -101,6 +102,8 @@ class SYMBOLEXP ProtocolMqtt5Client : public IProtocol, public IMqtt5ClientCallb std::string m_password{}; std::uint32_t m_sessionExpiryInterval = 5 * 60; // default 5 minutes std::uint32_t m_keepAlive = 20; // default 20 seconds + std::string m_topicPrefix; + std::string m_sessionPrefix; std::string m_clientId{}; std::string m_virtualSessionId{}; diff --git a/src/protocols/ProtocolMqtt5Client.cpp b/src/protocols/ProtocolMqtt5Client.cpp index e321ed36..924c2f7b 100644 --- a/src/protocols/ProtocolMqtt5Client.cpp +++ b/src/protocols/ProtocolMqtt5Client.cpp @@ -54,6 +54,7 @@ const std::string ProtocolMqtt5Client::KEY_USERNAME = "username"; const std::string ProtocolMqtt5Client::KEY_PASSWORD = "password"; const std::string ProtocolMqtt5Client::KEY_SESSIONEXPIRYINTERVAL = "sessionexpiryinterval"; const std::string ProtocolMqtt5Client::KEY_KEEPALIVE = "keepalive"; +const std::string ProtocolMqtt5Client::KEY_TOPIC_PREFIX = "topicprefix"; static const std::string FMQ_PATH = "fmq_path"; static const std::string FMQ_CORRID = "fmq_corrid"; @@ -63,7 +64,7 @@ static const std::string FMQ_DESTNAME = "fmq_destname"; static const std::string FMQ_SUBPATH = "fmq_subpath"; static const std::string TOPIC_WILLMESSAGE = "/fmq_willmsg"; -static const std::string SESSIONID_PREFIX = "/_id:"; +static const std::string SESSIONID_PREFIX = "/_id/"; static const std::uint8_t ReasonCodeDisconnectWithWillMessage = 0x04; @@ -173,8 +174,17 @@ ProtocolMqtt5Client::ProtocolMqtt5Client(const Variant& data) { m_keepAlive = *entry; } + m_topicPrefix = data.getDataValue(KEY_TOPIC_PREFIX); + if (!m_topicPrefix.empty()) + { + if (m_topicPrefix[0] != '/') + { + m_topicPrefix = "/" + m_topicPrefix; + } + } m_clientId = getUuid(); - m_virtualSessionId = SESSIONID_PREFIX + m_clientId; + m_sessionPrefix = m_topicPrefix + '/' + SESSIONID_PREFIX; + m_virtualSessionId = m_sessionPrefix + m_clientId; m_client->setCallback(this); } @@ -320,6 +330,11 @@ void ProtocolMqtt5Client::sendMessage(IMessagePtr message) std::string topic = message->getControlData().getDataValue(FMQ_VIRTUAL_SESSION_ID); + if (topic.empty()) + { + topic = m_topicPrefix; + } + std::string* destname = message->getControlData().getData(FMQ_DESTNAME); if (destname && !destname->empty()) { @@ -408,7 +423,7 @@ void ProtocolMqtt5Client::subscribe(const std::vector& subscribtion for (size_t i = 0; i < subscribtions.size(); ++i) { const std::string& subscription = subscribtions[i]; - std::string topic = "/" + subscription; + std::string topic = m_topicPrefix + '/' + subscription; data.subscriptions.push_back({topic, 2, false, false, 2}); } @@ -498,15 +513,20 @@ void ProtocolMqtt5Client::receivedPublish(const PublishData& data, const IMessag } else { - if (data.topic.compare(0, SESSIONID_PREFIX.size(), SESSIONID_PREFIX.c_str()) == 0) + if (data.topic.compare(0, m_sessionPrefix.size(), m_sessionPrefix.c_str()) == 0) { - size_t pos = data.topic.find_first_of('/', SESSIONID_PREFIX.size()); + size_t pos = data.topic.find_first_of('/', m_sessionPrefix.size()); if (pos != std::string::npos) { std::string path = &data.topic[pos]; message->addMetainfo(FMQ_PATH, std::move(path)); } } + else if (data.topic.compare(0, m_topicPrefix.size(), m_topicPrefix.c_str()) == 0) + { + std::string path = &data.topic[m_topicPrefix.size()]; + message->addMetainfo(FMQ_PATH, std::move(path)); + } else { message->addMetainfo(FMQ_PATH, data.topic);