Skip to content

Commit

Permalink
Add isSyncronousRequestReply flag to protocol. Give ProtocolSession
Browse files Browse the repository at this point in the history
ability to connect multiple times. Add ProtocolHttpClient
  • Loading branch information
beckdave committed Jul 3, 2023
1 parent 0c1ffc6 commit 3deedae
Show file tree
Hide file tree
Showing 20 changed files with 1,753 additions and 73 deletions.
1 change: 1 addition & 0 deletions inc/finalmq/protocols/ProtocolHeaderBinarySize.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SYMBOLEXP ProtocolHeaderBinarySize : public IProtocol
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual FuncCreateMessage getMessageFactory() const override;
virtual void sendMessage(IMessagePtr message) override;
virtual void moveOldProtocolState(IProtocol& protocolOld) override;
Expand Down
143 changes: 143 additions & 0 deletions inc/finalmq/protocols/ProtocolHttpClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//MIT License

//Copyright (c) 2020 bexoft GmbH (mail@bexoft.de)

//Permission is hereby granted, free of charge, to any person obtaining a copy
//of this software and associated documentation files (the "Software"), to deal
//in the Software without restriction, including without limitation the rights
//to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the Software is
//furnished to do so, subject to the following conditions:

//The above copyright notice and this permission notice shall be included in all
//copies or substantial portions of the Software.

//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
//AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
//LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
//SOFTWARE.

#pragma once

#include "finalmq/streamconnection/IMessage.h"
#include "finalmq/protocolsession/IProtocol.h"
#include "finalmq/helpers/FmqDefines.h"
#include "finalmq/helpers/Executor.h"

#include <random>


namespace finalmq {


class SYMBOLEXP ProtocolHttpClient : public IProtocol
, public std::enable_shared_from_this<ProtocolHttpClient>
{
public:
static const std::uint32_t PROTOCOL_ID; // 7
static const std::string PROTOCOL_NAME; // httpclient

static const std::string FMQ_HTTP;
static const std::string FMQ_METHOD;
static const std::string FMQ_PROTOCOL;
static const std::string FMQ_PATH;
static const std::string FMQ_QUERY_PREFIX;
static const std::string FMQ_HTTP_STATUS;
static const std::string FMQ_HTTP_STATUSTEXT;
static const std::string HTTP_REQUEST;
static const std::string HTTP_RESPONSE;

ProtocolHttpClient();
virtual ~ProtocolHttpClient();

private:

// IProtocol
virtual void setCallback(const std::weak_ptr<IProtocolCallback>& callback) override;
virtual void setConnection(const IStreamConnectionPtr& connection) override;
virtual IStreamConnectionPtr getConnection() const override;
virtual void disconnect() override;
virtual std::uint32_t getProtocolId() const override;
virtual bool areMessagesResendable() const override;
virtual bool doesSupportMetainfo() const override;
virtual bool doesSupportSession() const override;
virtual bool needsReply() const override;
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual FuncCreateMessage getMessageFactory() const override;
virtual void sendMessage(IMessagePtr message) override;
virtual void moveOldProtocolState(IProtocol& protocolOld) override;
virtual bool received(const IStreamConnectionPtr& connection, const SocketPtr& socket, int bytesToRead) override;
virtual hybrid_ptr<IStreamConnectionCallback> connected(const IStreamConnectionPtr& connection) override;
virtual void disconnected(const IStreamConnectionPtr& connection) override;
virtual IMessagePtr pollReply(std::deque<IMessagePtr>&& messages) override;
virtual void subscribe(const std::vector<std::string>& subscribtions) override;
virtual void cycleTime() override;


bool receiveHeaders(ssize_t bytesReceived);
void reset();
std::string createSessionName();
void cookiesToSessionIds(const std::string& cookies);
// bool handleInternalCommands(const std::shared_ptr<IProtocolCallback>& callback, bool& ok);

enum class State
{
STATE_FIND_FIRST_LINE,
STATE_FIND_HEADERS,
STATE_CONTENT,
STATE_CONTENT_DONE
};

enum class StateSessionId
{
SESSIONID_NONE = 0,
SESSIONID_COOKIE = 1,
SESSIONID_FMQ = 2
};

std::random_device m_randomDevice;
std::mt19937 m_randomGenerator;
std::uniform_int_distribution<std::uint64_t> m_randomVariable;
IMessage::Metainfo m_headerSendNext;
StateSessionId m_stateSessionId = StateSessionId::SESSIONID_NONE;
std::vector<std::string> m_sessionNames;

State m_state = State::STATE_FIND_FIRST_LINE;
std::string m_receiveBuffer;
ssize_t m_offsetRemaining = 0;
ssize_t m_sizeRemaining = 0;
IMessagePtr m_message;
ssize_t m_contentLength = 0;
ssize_t m_indexFilled = 0;
std::string m_headerHost;
std::int64_t m_connectionId = 0;
bool m_createSession = false;
std::string m_sessionName;
std::weak_ptr<IProtocolCallback> m_callback;
IStreamConnectionPtr m_connection;
bool m_multipart = false;

// path
std::string* m_path = nullptr;

mutable std::mutex m_mutex;
static std::atomic_int64_t m_nextSessionNameCounter;
};


class SYMBOLEXP ProtocolHttpClientFactory : public IProtocolFactory
{
public:

private:
// IProtocolFactory
virtual IProtocolPtr createProtocol(const Variant& data) override;
};

} // namespace finalmq
1 change: 1 addition & 0 deletions inc/finalmq/protocols/ProtocolHttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class SYMBOLEXP ProtocolHttpServer : public IProtocol
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual FuncCreateMessage getMessageFactory() const override;
virtual void sendMessage(IMessagePtr message) override;
virtual void moveOldProtocolState(IProtocol& protocolOld) override;
Expand Down
1 change: 1 addition & 0 deletions inc/finalmq/protocols/ProtocolMqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class SYMBOLEXP ProtocolMqtt5Client : public IProtocol
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual FuncCreateMessage getMessageFactory() const override;
virtual void sendMessage(IMessagePtr message) override;
virtual void moveOldProtocolState(IProtocol& protocolOld) override;
Expand Down
1 change: 1 addition & 0 deletions inc/finalmq/protocols/ProtocolStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SYMBOLEXP ProtocolStream : public IProtocol
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual FuncCreateMessage getMessageFactory() const override;
virtual void sendMessage(IMessagePtr message) override;
virtual void moveOldProtocolState(IProtocol& protocolOld) override;
Expand Down
1 change: 1 addition & 0 deletions inc/finalmq/protocols/protocolhelpers/ProtocolDelimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SYMBOLEXP ProtocolDelimiter : public IProtocol
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual FuncCreateMessage getMessageFactory() const override;
virtual void sendMessage(IMessagePtr message) override;
virtual void moveOldProtocolState(IProtocol& protocolOld) override;
Expand Down
1 change: 1 addition & 0 deletions inc/finalmq/protocolsession/IProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct IProtocol : public IStreamConnectionCallback
virtual bool isMultiConnectionSession() const = 0;
virtual bool isSendRequestByPoll() const = 0;
virtual bool doesSupportFileTransfer() const = 0;
virtual bool isSynchronousRequestReply() const = 0;
virtual FuncCreateMessage getMessageFactory() const = 0;
virtual void sendMessage(IMessagePtr message) = 0;
virtual void moveOldProtocolState(IProtocol& protocolOld) = 0;
Expand Down
1 change: 1 addition & 0 deletions inc/finalmq/protocolsession/IProtocolSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct IProtocolSession
virtual bool isMultiConnectionSession() const = 0;
virtual bool isSendRequestByPoll() const = 0;
virtual bool doesSupportFileTransfer() const = 0;
virtual bool isSynchronousRequestReply() const = 0;
virtual void disconnect() = 0;
// virtual bool connect(const std::string& endpoint, const ConnectProperties& connectionProperties = {}) = 0;
virtual bool connect(const std::string& endpoint, const ConnectProperties& connectionProperties = {}, int contentType = 0) = 0;
Expand Down
25 changes: 19 additions & 6 deletions inc/finalmq/protocolsession/ProtocolSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@
#include "ProtocolSessionList.h"
#include "IProtocolSession.h"

#include <unordered_set>

namespace finalmq {




struct IProtocolSessionPrivate : public IProtocolSession
{
virtual bool connect() = 0;
Expand All @@ -60,7 +59,7 @@ class ProtocolSession : public IProtocolSessionPrivate
{
public:
ProtocolSession(hybrid_ptr<IProtocolSessionCallback> callback, const IExecutorPtr& executor, const IExecutorPtr& executorPollerThread, const IProtocolPtr& protocol, const std::shared_ptr<IProtocolSessionList>& protocolSessionList, const BindProperties& bindProperties, int contentType);
ProtocolSession(hybrid_ptr<IProtocolSessionCallback> callback, const IExecutorPtr& executor, const IExecutorPtr& executorPollerThread, const IProtocolPtr& protocol, const std::shared_ptr<IProtocolSessionList>& protocolSessionList, const std::shared_ptr<IStreamConnectionContainer>& streamConnectionContainer, const std::string& endpointStreamConnection, const ConnectProperties& connectProperties, int contentType);
ProtocolSession(hybrid_ptr<IProtocolSessionCallback> callback, const IExecutorPtr& executor, const IExecutorPtr& executorPollerThread, const IProtocolFactoryPtr& protocolFactory, const std::shared_ptr<IProtocolSessionList>& protocolSessionList, const std::shared_ptr<IStreamConnectionContainer>& streamConnectionContainer, const std::string& endpointStreamConnection, const ConnectProperties& connectProperties, int contentType);
ProtocolSession(hybrid_ptr<IProtocolSessionCallback> callback, const IExecutorPtr& executor, const IExecutorPtr& executorPollerThread, const std::shared_ptr<IProtocolSessionList>& protocolSessionList, const std::shared_ptr<IStreamConnectionContainer>& streamConnectionContainer);

virtual ~ProtocolSession();
Expand All @@ -77,6 +76,7 @@ class ProtocolSession : public IProtocolSessionPrivate
virtual bool isMultiConnectionSession() const override;
virtual bool isSendRequestByPoll() const override;
virtual bool doesSupportFileTransfer() const override;
virtual bool isSynchronousRequestReply() const override;
virtual void disconnect() override;
// virtual bool connect(const std::string& endpoint, const ConnectProperties& connectionProperties = {}) override;
virtual bool connect(const std::string& endpoint, const ConnectProperties& connectionProperties = {}, int contentType = 0) override;
Expand Down Expand Up @@ -122,12 +122,19 @@ class ProtocolSession : public IProtocolSessionPrivate
void cleanupMultiConnection();
void pollRelease();

bool hasPendingRequests() const;
IProtocolPtr allocateRequestConnection();
IProtocolPtr createRequestConnection();
void sendNextRequests();


const hybrid_ptr<IProtocolSessionCallback> m_callback;
const IExecutorPtr m_executor;
const IExecutorPtr m_executorPollerThread;
IProtocolPtr m_protocol;
std::atomic<std::int64_t> m_connectionId{0};
std::unordered_map<std::int64_t, IProtocolPtr> m_multiProtocols;
std::unordered_set<std::int64_t> m_unallocatedConnections;

const std::weak_ptr<IProtocolSessionList> m_protocolSessionList;
const int64_t m_sessionId = 0;
Expand All @@ -140,21 +147,27 @@ class ProtocolSession : public IProtocolSessionPrivate
std::atomic<bool> m_protocolFlagNeedsReply{false};
std::atomic<bool> m_protocolFlagIsMultiConnectionSession{false};
std::atomic<bool> m_protocolFlagIsSendRequestByPoll{false};
std::atomic<bool> m_protocolFlagSupportFileTransfer{false};
std::atomic<bool> m_protocolFlagSupportFileTransfer{ false };
std::atomic<bool> m_protocolFlagSynchronousRequestReply{ false };

IProtocolFactoryPtr m_protocolFactory;
IProtocol::FuncCreateMessage m_messageFactory;
std::atomic_bool m_protocolSet{false};
bool m_triggerConnected = false;
bool m_triggerDisconnected = false;
bool m_triggeredConnected = false;
bool m_callConnect = false;
bool m_triggeredDisconnected = false;

const std::shared_ptr<IStreamConnectionContainer> m_streamConnectionContainer;
std::string m_endpointStreamConnection;

const BindProperties m_bindProperties;
ConnectProperties m_connectionProperties;
Variant m_protocolData;
Variant m_formatData;
int m_maxSynchReqRepConnections = -1;

std::deque<IMessagePtr> m_messagesBuffered;
std::unordered_map<std::int64_t, Variant> m_runningRequests;

std::deque<IMessagePtr> m_pollMessages;
int m_pollMaxRequests = 10000;
Expand Down
5 changes: 5 additions & 0 deletions src/protocols/ProtocolHeaderBinarySize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ bool ProtocolHeaderBinarySize::doesSupportFileTransfer() const
return false;
}

bool ProtocolHeaderBinarySize::isSynchronousRequestReply() const
{
return false;
}

IProtocol::FuncCreateMessage ProtocolHeaderBinarySize::getMessageFactory() const
{
return []() {
Expand Down
Loading

0 comments on commit 3deedae

Please sign in to comment.