From 5d0149aec633fd17766be4e56c83828b3f292707 Mon Sep 17 00:00:00 2001 From: Shou-Li Hsu Date: Wed, 7 Aug 2024 18:21:18 +0800 Subject: [PATCH] Introduce Unix Domain Socket Transport Layers (#20) * Add asio 1.28.2 dependency * Add PipeTransport class with Unix domain socket support * Upgrade project to C++20 and add FramedPipeTransport implementation --- .bazelrc | 4 +- CMakeLists.txt | 16 ++- MODULE.bazel | 3 + MODULE.bazel.lock | 2 + README.md | 24 ++-- conanfile.py | 3 +- examples/calculator.hpp | 4 +- examples/framed_pipe_client.cpp | 35 +++++ examples/framed_pipe_server.cpp | 41 ++++++ examples/pipe_client.cpp | 35 +++++ examples/pipe_server.cpp | 42 ++++++ examples/stdio_client.cpp | 9 +- examples/stdio_server.cpp | 15 +- .../transport/framed_pipe_transport.hpp | 27 ++++ .../jsonrpc/transport/framed_transport.hpp | 33 ++++- include/jsonrpc/transport/pipe_transport.hpp | 48 +++++++ include/jsonrpc/utils/string_utils.hpp | 19 +++ src/BUILD.bazel | 1 + src/client/client.cpp | 1 + src/server/server.cpp | 3 + src/transport/framed_pipe_transport.cpp | 74 ++++++++++ src/transport/framed_stdio_transport.cpp | 2 +- src/transport/framed_transport.cpp | 79 ++++++----- src/transport/pipe_transport.cpp | 97 +++++++++++++ src/transport/stdio_transport.cpp | 4 +- tests/BUILD.bazel | 10 ++ tests/transports/test_framed_transport.cpp | 134 ++++++++++++------ tests/transports/test_pipe_transport.cpp | 68 +++++++++ 28 files changed, 716 insertions(+), 117 deletions(-) create mode 100644 examples/framed_pipe_client.cpp create mode 100644 examples/framed_pipe_server.cpp create mode 100644 examples/pipe_client.cpp create mode 100644 examples/pipe_server.cpp create mode 100644 include/jsonrpc/transport/framed_pipe_transport.hpp create mode 100644 include/jsonrpc/transport/pipe_transport.hpp create mode 100644 include/jsonrpc/utils/string_utils.hpp create mode 100644 src/transport/framed_pipe_transport.cpp create mode 100644 src/transport/pipe_transport.cpp create mode 100644 tests/transports/test_pipe_transport.cpp diff --git a/.bazelrc b/.bazelrc index 85c6cef..fca820a 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,7 +1,7 @@ # .bazelrc -# Set the C++ standard to C++17 -build --cxxopt='-std=c++17' +# Set the C++ standard to C++20 +build --cxxopt='-std=c++20' # Enable warnings build --cxxopt='-Wall' diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d3e7ea..060cb82 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.19) project(jsonrpc VERSION 1.0.0 LANGUAGES CXX) # Set C++ standard -set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) # Set C++ Compiler Launcher @@ -16,6 +16,7 @@ file(GLOB_RECURSE SOURCES "src/*.cpp") find_package(nlohmann_json REQUIRED) find_package(spdlog REQUIRED) find_package(bshoshany-thread-pool REQUIRED) +find_package(asio REQUIRED) # Add the library target add_library(jsonrpc ${SOURCES}) @@ -28,6 +29,7 @@ target_link_libraries(jsonrpc PUBLIC nlohmann_json::nlohmann_json spdlog::spdlog bshoshany-thread-pool::bshoshany-thread-pool + asio::asio ) # Add example executables @@ -37,5 +39,17 @@ target_link_libraries(stdio_client PRIVATE jsonrpc) add_executable(stdio_server examples/stdio_server.cpp) target_link_libraries(stdio_server PRIVATE jsonrpc) +add_executable(pipe_client examples/pipe_client.cpp) +target_link_libraries(pipe_client PRIVATE jsonrpc) + +add_executable(pipe_server examples/pipe_server.cpp) +target_link_libraries(pipe_server PRIVATE jsonrpc) + +add_executable(framed_pipe_client examples/framed_pipe_client.cpp) +target_link_libraries(framed_pipe_client PRIVATE jsonrpc) + +add_executable(framed_pipe_server examples/framed_pipe_server.cpp) +target_link_libraries(framed_pipe_server PRIVATE jsonrpc) + enable_testing() add_subdirectory(tests) diff --git a/MODULE.bazel b/MODULE.bazel index 8e04af5..042c5eb 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -15,5 +15,8 @@ bazel_dep(name = "nlohmann_json", version = "3.11.3") # Register the spdlog dependency bazel_dep(name = "spdlog", version = "1.14.1") +# Register the asio dependency +bazel_dep(name = "asio", version = "1.28.2") + # Register the catch2 dependency bazel_dep(name = "catch2", version = "3.6.0") diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 226f8f8..02eb667 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -7,6 +7,8 @@ "https://bcr.bazel.build/modules/abseil-cpp/20211102.0/source.json": "7e3a9adf473e9af076ae485ed649d5641ad50ec5c11718103f34de03170d94ad", "https://bcr.bazel.build/modules/apple_support/1.5.0/MODULE.bazel": "50341a62efbc483e8a2a6aec30994a58749bd7b885e18dd96aa8c33031e558ef", "https://bcr.bazel.build/modules/apple_support/1.5.0/source.json": "eb98a7627c0bc486b57f598ad8da50f6625d974c8f723e9ea71bd39f709c9862", + "https://bcr.bazel.build/modules/asio/1.28.2/MODULE.bazel": "c985926a85680be7047309db9fb56f2e1c590394adbe48f309ee76d4c78ab8ce", + "https://bcr.bazel.build/modules/asio/1.28.2/source.json": "10d8d6af06420092fa73c6e7e79e36dee3c166c1e668ab2edefd4c497c37bbcf", "https://bcr.bazel.build/modules/bazel_features/1.11.0/MODULE.bazel": "f9382337dd5a474c3b7d334c2f83e50b6eaedc284253334cf823044a26de03e8", "https://bcr.bazel.build/modules/bazel_features/1.11.0/source.json": "c9320aa53cd1c441d24bd6b716da087ad7e4ff0d9742a9884587596edfe53015", "https://bcr.bazel.build/modules/bazel_skylib/1.0.3/MODULE.bazel": "bcb0fd896384802d1ad283b4e4eb4d718eebd8cb820b0a2c3a347fb971afd9d8", diff --git a/README.md b/README.md index 5fcf7c4..58e6503 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,19 @@ Welcome to the **JSON-RPC 2.0 Modern C++ Library**! This library provides a ligh ## ✨ Features - **Fully Compliant with JSON-RPC 2.0**: Supports method calls, notifications, comprehensive error handling, and batch requests. -- **Modern and Lightweight**: Leverages C++17 features with minimal dependencies, focusing solely on the JSON-RPC protocol. +- **Modern and Lightweight**: Leverages C++20 features with minimal dependencies, focusing solely on the JSON-RPC protocol. - **Transport-Agnostic**: Abstract transport layer allows use of provided implementations or custom ones. - **Simple JSON Integration**: Uses [nlohmann/json](https://github.com/nlohmann/json) for easy JSON object interaction, requiring no learning curve. - **Flexible Handler Registration**: Register handlers using `std::function`, supporting lambdas, function pointers, and other callable objects. ## 🚀 Getting Started +### Prerequisites + +- **Compiler**: Any compiler with C++20 support. +- **CMake**: Version 3.19+ (for CMake preset support). +- **Bazel**: Version 5.0+ (for Bazel module support). + To include this library in your project, you can use CMake's FetchContent, Conan 2, or Bazel. ### Using CMake FetchContent @@ -59,11 +65,12 @@ bazel_dep(name = "jsonrpc", version = "1.0.0") Here’s how to create a simple JSON-RPC server: ```cpp -using namespace jsonrpc; +using namespace jsonrpc::server; +using namespace jsonrpc::transport; using Json = nlohmann::json; // Create a server with an stdio transport -server::Server server(std::make_unique()); +Server server(std::make_unique()); // Register a method named "add" that adds two numbers server.RegisterMethodCall("add", [](const std::optional ¶ms) { @@ -87,16 +94,17 @@ To register a method, you need to provide a function that takes optional `Json` Here’s how to create a JSON-RPC client: ```cpp -using namespace jsonrpc; +using namespace jsonrpc::client; +using namespace jsonrpc::transport; using Json = nlohmann::json; -// Create a client with an stdio transport -client::Client client(std::make_unique()); +// Create a client with a standard I/O transport +Client client(std::make_unique()); client.Start(); // Perform addition auto response = client.SendMethodCall("add", Json({{"a", 10}, {"b", 5}})); -std::cout << "Add result: " << response.dump() << std::endl; +spdlog::info("Add result: {}", response.dump()); // Send stop notification client.SendNotification("stop"); @@ -140,6 +148,7 @@ Next, install dependencies and generate `ConanPresets.json`: ```bash conan install . --build=missing +conan install . -s build_type=Debug --build=missing ``` **Step 2: Configure and Build the Project** @@ -166,7 +175,6 @@ ctest --preset release For Debug configuration: ```bash -conan install . -s build_type=Debug --build=missing cmake --preset debug cmake --build --preset debug ctest --preset debug diff --git a/conanfile.py b/conanfile.py index 4e4a345..ed66d9f 100644 --- a/conanfile.py +++ b/conanfile.py @@ -24,7 +24,8 @@ class JsonRpcConan(ConanFile): requires = [ "nlohmann_json/3.11.3", "spdlog/1.14.1", - "bshoshany-thread-pool/4.1.0" + "bshoshany-thread-pool/4.1.0", + "asio/1.28.2" ] tool_requires = [ diff --git a/examples/calculator.hpp b/examples/calculator.hpp index 5bf2572..8da6d74 100644 --- a/examples/calculator.hpp +++ b/examples/calculator.hpp @@ -8,14 +8,14 @@ using Json = nlohmann::json; class Calculator { public: Json Add(const Json ¶ms) { - spdlog::info("Received add request with params: {}", params.dump()); + spdlog::debug("Received add request with params: {}", params.dump()); double a = params["a"]; double b = params["b"]; return {{"result", a + b}}; } Json Divide(const Json ¶ms) { - spdlog::info("Received divide request with params: {}", params.dump()); + spdlog::debug("Received divide request with params: {}", params.dump()); double a = params["a"]; double b = params["b"]; if (b == 0) { diff --git a/examples/framed_pipe_client.cpp b/examples/framed_pipe_client.cpp new file mode 100644 index 0000000..ee2d1c3 --- /dev/null +++ b/examples/framed_pipe_client.cpp @@ -0,0 +1,35 @@ +#include +#include + +#include +#include +#include +#include +#include + +using namespace jsonrpc::client; +using namespace jsonrpc::transport; +using Json = nlohmann::json; + +int main() { + auto logger = spdlog::basic_logger_mt("client", "logs/client.log", true); + spdlog::set_default_logger(logger); + spdlog::set_level(spdlog::level::debug); + spdlog::flush_on(spdlog::level::debug); + + std::string socketPath = "/tmp/calculator_pipe"; + auto transport = std::make_unique(socketPath, false); + Client client(std::move(transport)); + client.Start(); + + Json addRes = client.SendMethodCall("add", Json({{"a", 10}, {"b", 5}})); + spdlog::info("Add result: {}", addRes.dump()); + + Json divRes = client.SendMethodCall("divide", Json({{"a", 10}, {"b", 0}})); + spdlog::info("Divide result: {}", divRes.dump()); + + client.SendNotification("stop"); + + client.Stop(); + return 0; +} diff --git a/examples/framed_pipe_server.cpp b/examples/framed_pipe_server.cpp new file mode 100644 index 0000000..22f4397 --- /dev/null +++ b/examples/framed_pipe_server.cpp @@ -0,0 +1,41 @@ +#include + +#include +#include +#include +#include +#include + +#include "calculator.hpp" + +using namespace jsonrpc::server; +using namespace jsonrpc::transport; +using Json = nlohmann::json; + +int main() { + auto logger = spdlog::basic_logger_mt("server", "logs/server.log", true); + spdlog::set_default_logger(logger); + spdlog::set_level(spdlog::level::debug); + spdlog::flush_on(spdlog::level::debug); + + std::string socketPath = "/tmp/calculator_pipe"; + auto transport = std::make_unique(socketPath, true); + Server server(std::move(transport)); + Calculator calculator; + + server.RegisterMethodCall( + "add", [&calculator](const std::optional ¶ms) { + return calculator.Add(params.value()); + }); + + server.RegisterMethodCall( + "divide", [&calculator](const std::optional ¶ms) { + return calculator.Divide(params.value()); + }); + + server.RegisterNotification( + "stop", [&server](const std::optional &) { server.Stop(); }); + + server.Start(); + return 0; +} diff --git a/examples/pipe_client.cpp b/examples/pipe_client.cpp new file mode 100644 index 0000000..6453f4a --- /dev/null +++ b/examples/pipe_client.cpp @@ -0,0 +1,35 @@ +#include +#include + +#include +#include +#include +#include +#include + +using namespace jsonrpc::client; +using namespace jsonrpc::transport; +using Json = nlohmann::json; + +int main() { + auto logger = spdlog::basic_logger_mt("client", "logs/client.log", true); + spdlog::set_default_logger(logger); + spdlog::set_level(spdlog::level::debug); + spdlog::flush_on(spdlog::level::debug); + + std::string socketPath = "/tmp/calculator_pipe"; + auto transport = std::make_unique(socketPath, false); + Client client(std::move(transport)); + client.Start(); + + Json addRes = client.SendMethodCall("add", Json({{"a", 10}, {"b", 5}})); + spdlog::info("Add result: {}", addRes.dump()); + + Json divRes = client.SendMethodCall("divide", Json({{"a", 10}, {"b", 0}})); + spdlog::info("Divide result: {}", divRes.dump()); + + client.SendNotification("stop"); + + client.Stop(); + return 0; +} diff --git a/examples/pipe_server.cpp b/examples/pipe_server.cpp new file mode 100644 index 0000000..7065823 --- /dev/null +++ b/examples/pipe_server.cpp @@ -0,0 +1,42 @@ +#include + +#include +#include +#include +#include +#include + +#include "calculator.hpp" + +using namespace jsonrpc::server; +using namespace jsonrpc::transport; +using Json = nlohmann::json; + +int main() { + auto logger = spdlog::basic_logger_mt("server", "logs/server.log", true); + spdlog::set_default_logger(logger); + spdlog::set_level(spdlog::level::debug); + spdlog::flush_on(spdlog::level::debug); + + std::string socketPath = "/tmp/calculator_pipe"; + + auto transport = std::make_unique(socketPath, true); + Server server(std::move(transport)); + Calculator calculator; + + server.RegisterMethodCall( + "add", [&calculator](const std::optional ¶ms) { + return calculator.Add(params.value()); + }); + + server.RegisterMethodCall( + "divide", [&calculator](const std::optional ¶ms) { + return calculator.Divide(params.value()); + }); + + server.RegisterNotification( + "stop", [&server](const std::optional &) { server.Stop(); }); + + server.Start(); + return 0; +} diff --git a/examples/stdio_client.cpp b/examples/stdio_client.cpp index db6fb92..568a76b 100644 --- a/examples/stdio_client.cpp +++ b/examples/stdio_client.cpp @@ -7,17 +7,18 @@ #include #include -using namespace jsonrpc; +using namespace jsonrpc::client; +using namespace jsonrpc::transport; using Json = nlohmann::json; int main() { - auto logger = spdlog::basic_logger_mt("client_logger", "logs/client.log"); + auto logger = spdlog::basic_logger_mt("client", "logs/client.log"); spdlog::set_default_logger(logger); spdlog::set_level(spdlog::level::debug); spdlog::flush_on(spdlog::level::debug); - auto transport = std::make_unique(); - client::Client client(std::move(transport)); + auto transport = std::make_unique(); + Client client(std::move(transport)); client.Start(); Json addRes = client.SendMethodCall("add", Json({{"a", 10}, {"b", 5}})); diff --git a/examples/stdio_server.cpp b/examples/stdio_server.cpp index 61f6347..362ffae 100644 --- a/examples/stdio_server.cpp +++ b/examples/stdio_server.cpp @@ -8,17 +8,18 @@ #include "calculator.hpp" -using namespace jsonrpc; +using namespace jsonrpc::server; +using namespace jsonrpc::transport; using Json = nlohmann::json; int main() { - auto logger = spdlog::basic_logger_mt("server_logger", "logs/server.log"); + auto logger = spdlog::basic_logger_mt("server", "logs/server.log"); spdlog::set_default_logger(logger); spdlog::set_level(spdlog::level::debug); spdlog::flush_on(spdlog::level::debug); - auto transport = std::make_unique(); - server::Server server(std::move(transport)); + auto transport = std::make_unique(); + Server server(std::move(transport)); Calculator calculator; server.RegisterMethodCall( @@ -31,10 +32,8 @@ int main() { return calculator.Divide(params.value()); }); - server.RegisterNotification("stop", [&server](const std::optional &) { - spdlog::info("Server Received stop notification"); - server.Stop(); - }); + server.RegisterNotification( + "stop", [&server](const std::optional &) { server.Stop(); }); server.Start(); return 0; diff --git a/include/jsonrpc/transport/framed_pipe_transport.hpp b/include/jsonrpc/transport/framed_pipe_transport.hpp new file mode 100644 index 0000000..8ae6310 --- /dev/null +++ b/include/jsonrpc/transport/framed_pipe_transport.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include + +#include "jsonrpc/transport/framed_transport.hpp" +#include "jsonrpc/transport/pipe_transport.hpp" +#include "jsonrpc/transport/transport.hpp" + +namespace jsonrpc { +namespace transport { + +/** + * @brief Transport layer using Asio Unix domain sockets for JSON-RPC + * communication with framing. + */ +class FramedPipeTransport : public PipeTransport, protected FramedTransport { +public: + FramedPipeTransport(const std::string &socketPath, bool isServer); + + void SendMessage(const std::string &message) override; + std::string ReceiveMessage() override; +}; + +} // namespace transport +} // namespace jsonrpc diff --git a/include/jsonrpc/transport/framed_transport.hpp b/include/jsonrpc/transport/framed_transport.hpp index 5229b0d..424e914 100644 --- a/include/jsonrpc/transport/framed_transport.hpp +++ b/include/jsonrpc/transport/framed_transport.hpp @@ -1,17 +1,28 @@ #pragma once +#include +#include #include +#include namespace jsonrpc { namespace transport { +class FramedTransportTest; + /** * @brief Base class for framed transport mechanisms. * - * Provides basic functionality for sending and receiving framed messages. + * Provides modular functionality for sending and receiving framed messages. */ class FramedTransport { + /// @brief A map of headers to their values. + using HeaderMap = std::unordered_map; + protected: + /// @brief The delimiter used to separate headers from the message content. + static constexpr const char *HEADER_DELIMITER = "\r\n\r\n"; + /** * @brief Constructs a framed message. * @@ -23,6 +34,18 @@ class FramedTransport { */ void FrameMessage(std::ostream &output, const std::string &message); + HeaderMap ReadHeadersFromStream(std::istream &input); + int ReadContentLengthFromStream(std::istream &input); + + /** + * @brief Reads content from the input stream based on the content length. + * + * @param input The input stream to read the content from. + * @param content_length The length of the content to be read. + * @return The content as a string. + */ + std::string ReadContent(std::istream &input, int content_length); + /** * @brief Receives a framed message. * @@ -30,9 +53,9 @@ class FramedTransport { * content based on that length. * * @param input The input stream to read the framed message. - * @return The received message. + * @return The received message content. */ - std::string DeframeMessage(std::istream &input); + std::string ReceiveFramedMessage(std::istream &input); private: /** @@ -41,7 +64,9 @@ class FramedTransport { * @param header_value The header value containing the content length. * @return The parsed content length. */ - int parseContentLength(const std::string &header_value); + int ParseContentLength(const std::string &header_value); + + friend class FramedTransportTest; }; } // namespace transport diff --git a/include/jsonrpc/transport/pipe_transport.hpp b/include/jsonrpc/transport/pipe_transport.hpp new file mode 100644 index 0000000..6678e16 --- /dev/null +++ b/include/jsonrpc/transport/pipe_transport.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +#include "jsonrpc/transport/transport.hpp" + +namespace jsonrpc { +namespace transport { + +/** + * @brief Transport implementation using Unix domain sockets. + * + * This class provides transport functionality over Unix domain sockets, + * supporting both client and server modes for inter-process communication + * on the same machine. + */ +class PipeTransport : public Transport { +public: + /** + * @brief Constructs a PipeTransport. + * @param socketPath Path to the Unix domain socket. + * @param isServer True if the transport acts as a server; false if it acts as + * a client. + */ + PipeTransport(const std::string &socketPath, bool isServer); + + ~PipeTransport(); + + void SendMessage(const std::string &message) override; + std::string ReceiveMessage() override; + +protected: + asio::local::stream_protocol::socket &GetSocket(); + +private: + void RemoveExistingSocketFile(); + void Connect(); + void BindAndListen(); + + asio::io_context ioContext_; + asio::local::stream_protocol::socket socket_; + std::string socketPath_; + bool isServer_; +}; + +} // namespace transport +} // namespace jsonrpc diff --git a/include/jsonrpc/utils/string_utils.hpp b/include/jsonrpc/utils/string_utils.hpp new file mode 100644 index 0000000..ed64f60 --- /dev/null +++ b/include/jsonrpc/utils/string_utils.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace jsonrpc { +namespace utils { + +std::string trim(std::string_view in) { + auto view = in | std::views::drop_while(::isspace) | std::views::reverse | + std::views::drop_while(::isspace) | std::views::reverse; + return {view.begin(), view.end()}; +} + +} // namespace utils +} // namespace jsonrpc diff --git a/src/BUILD.bazel b/src/BUILD.bazel index 2ed05de..b7072a5 100644 --- a/src/BUILD.bazel +++ b/src/BUILD.bazel @@ -6,6 +6,7 @@ cc_library( linkopts = ["-pthread"], visibility = ["//visibility:public"], deps = [ + "@asio", "@nlohmann_json//:json", "@spdlog", "@thread_pool", diff --git a/src/client/client.cpp b/src/client/client.cpp index 5e8f64a..abd6b01 100644 --- a/src/client/client.cpp +++ b/src/client/client.cpp @@ -34,6 +34,7 @@ bool Client::HasPendingRequests() const { } void Client::Listener() { + spdlog::info("Starting JSON-RPC client listener thread"); while (running_) { if (expectedResponses_ > 0) { std::string response = transport_->ReceiveMessage(); diff --git a/src/server/server.cpp b/src/server/server.cpp index 43e872c..25d6780 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -38,6 +38,9 @@ void Server::Listen() { while (IsRunning()) { std::string request = transport_->ReceiveMessage(); + if (request.empty()) { + continue; + } std::optional response = dispatcher_->DispatchRequest(request); if (response.has_value()) { transport_->SendMessage(response.value()); diff --git a/src/transport/framed_pipe_transport.cpp b/src/transport/framed_pipe_transport.cpp new file mode 100644 index 0000000..82675cf --- /dev/null +++ b/src/transport/framed_pipe_transport.cpp @@ -0,0 +1,74 @@ +#include "jsonrpc/transport/framed_pipe_transport.hpp" + +#include +#include + +#include + +namespace jsonrpc { +namespace transport { + +FramedPipeTransport::FramedPipeTransport( + const std::string &socketPath, bool isServer) + : PipeTransport(socketPath, isServer) { + spdlog::info( + "FramedPipeTransport initialized with socket path: {}", socketPath); +} + +void FramedPipeTransport::SendMessage(const std::string &message) { + try { + asio::streambuf messageBuf; + std::ostream messageStream(&messageBuf); + FrameMessage(messageStream, message); + + asio::error_code ec; + std::size_t bytesWritten = asio::write(GetSocket(), messageBuf.data(), ec); + + if (ec) { + throw std::runtime_error("Error sending message: " + ec.message()); + } + + spdlog::info( + "FramedPipeTransport sent message with {} bytes", bytesWritten); + } catch (const std::exception &e) { + spdlog::error("FramedPipeTransport failed to send message: {}", e.what()); + throw; + } +} + +std::string FramedPipeTransport::ReceiveMessage() { + asio::streambuf buffer; + asio::error_code ec; + + // Read headers until \r\n\r\n delimiter + asio::read_until(GetSocket(), buffer, HEADER_DELIMITER, ec); + if (ec) { + throw std::runtime_error("Failed to read message headers: " + ec.message()); + } + + std::istream headerStream(&buffer); + + // Extract content length from the headers + int contentLength = ReadContentLengthFromStream(headerStream); + + // Calculate how much more content we need to read + std::size_t remainingContentLength = contentLength - buffer.size(); + + // Read any remaining content directly into the buffer + if (remainingContentLength > 0) { + asio::read(GetSocket(), buffer.prepare(remainingContentLength), ec); + if (ec && ec != asio::error::eof) { + throw std::runtime_error( + "Failed to read message content: " + ec.message()); + } + buffer.commit(remainingContentLength); + } + + // Convert the entire buffer to a string + std::string content( + asio::buffers_begin(buffer.data()), asio::buffers_end(buffer.data())); + return content; +} + +} // namespace transport +} // namespace jsonrpc diff --git a/src/transport/framed_stdio_transport.cpp b/src/transport/framed_stdio_transport.cpp index 42695e9..36e2d20 100644 --- a/src/transport/framed_stdio_transport.cpp +++ b/src/transport/framed_stdio_transport.cpp @@ -17,7 +17,7 @@ void FramedStdioTransport::SendMessage(const std::string &message) { } std::string FramedStdioTransport::ReceiveMessage() { - std::string response = DeframeMessage(std::cin); + std::string response = ReceiveFramedMessage(std::cin); spdlog::debug("FramedStdioTransport received message: {}", response); return response; } diff --git a/src/transport/framed_transport.cpp b/src/transport/framed_transport.cpp index ba06ac8..9c2802f 100644 --- a/src/transport/framed_transport.cpp +++ b/src/transport/framed_transport.cpp @@ -1,67 +1,53 @@ #include "jsonrpc/transport/framed_transport.hpp" -#include #include +#include -#include +#include "jsonrpc/utils/string_utils.hpp" namespace jsonrpc { namespace transport { void FramedTransport::FrameMessage( std::ostream &output, const std::string &message) { - spdlog::debug("FramedTransport framing message: {}", message); output << "Content-Length: " << message.size() << "\r\n" << "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n" << "\r\n" << message; } -int FramedTransport::parseContentLength(const std::string &header_value) { - spdlog::debug("Parsing Content-Length value: {}", header_value); - try { - return std::stoi(header_value); - } catch (const std::invalid_argument &) { - spdlog::error("Invalid Content-Length value: {}", header_value); - throw std::runtime_error("Invalid Content-Length value"); - } catch (const std::out_of_range &) { - spdlog::error("Content-Length value out of range: {}", header_value); - throw std::runtime_error("Content-Length value out of range"); - } -} - -std::string FramedTransport::DeframeMessage(std::istream &input) { - spdlog::info("FramedTransport deframing message"); +FramedTransport::HeaderMap FramedTransport::ReadHeadersFromStream( + std::istream &input) { + HeaderMap headers; std::string line; - std::string headers; - int content_length = 0; - bool content_length_found = false; - // Read headers while (std::getline(input, line) && !line.empty() && line != "\r") { - headers += line + "\n"; - std::istringstream line_stream(line); - std::string header_key; - if (std::getline(line_stream, header_key, ':')) { - std::string header_value; - if (std::getline(line_stream >> std::ws, header_value)) { - spdlog::debug("Parsed header: {}: {}", header_key, header_value); - if (header_key == "Content-Length") { - content_length = parseContentLength(header_value); - content_length_found = true; - } - } + auto colonPos = line.find(':'); + if (colonPos != std::string::npos) { + std::string header_key = utils::trim(line.substr(0, colonPos)); + std::string header_value = utils::trim(line.substr(colonPos + 1)); + headers[header_key] = header_value; } } - if (!content_length_found) { - spdlog::error("Content-Length header missing"); + if (headers.empty()) { + throw std::runtime_error("Failed to read headers"); + } + + return headers; +} + +int FramedTransport::ReadContentLengthFromStream(std::istream &input) { + auto headers = ReadHeadersFromStream(input); + auto it = headers.find("Content-Length"); + if (it == headers.end()) { throw std::runtime_error("Content-Length header missing"); } + return ParseContentLength(it->second); +} - // Read content - spdlog::debug( - "FramedTransport reading content of length: {}", content_length); +std::string FramedTransport::ReadContent( + std::istream &input, int content_length) { std::string content(content_length, '\0'); input.read(&content[0], content_length); if (input.gcount() != content_length) { @@ -70,5 +56,20 @@ std::string FramedTransport::DeframeMessage(std::istream &input) { return content; } +std::string FramedTransport::ReceiveFramedMessage(std::istream &input) { + int content_length = ReadContentLengthFromStream(input); + return ReadContent(input, content_length); +} + +int FramedTransport::ParseContentLength(const std::string &header_value) { + try { + return std::stoi(header_value); + } catch (const std::invalid_argument &) { + throw std::runtime_error("Invalid Content-Length value"); + } catch (const std::out_of_range &) { + throw std::runtime_error("Content-Length value out of range"); + } +} + } // namespace transport } // namespace jsonrpc diff --git a/src/transport/pipe_transport.cpp b/src/transport/pipe_transport.cpp new file mode 100644 index 0000000..74b82ee --- /dev/null +++ b/src/transport/pipe_transport.cpp @@ -0,0 +1,97 @@ +#include "jsonrpc/transport/pipe_transport.hpp" + +#include +#include + +#include + +namespace jsonrpc { +namespace transport { + +PipeTransport::PipeTransport(const std::string &socketPath, bool isServer) + : socket_(ioContext_), socketPath_(socketPath), isServer_(isServer) { + spdlog::info("Initializing PipeTransport with socket path: {}. IsServer: {}", + socketPath, isServer); + + if (isServer_) { + RemoveExistingSocketFile(); + BindAndListen(); + } else { + Connect(); + } +} + +asio::local::stream_protocol::socket &PipeTransport::GetSocket() { + return socket_; +} + +PipeTransport::~PipeTransport() { + spdlog::info("Closing socket and shutting down PipeTransport."); + socket_.close(); + ioContext_.stop(); +} + +void PipeTransport::RemoveExistingSocketFile() { + if (unlink(socketPath_.c_str()) == 0) { + spdlog::info("Removed existing socket file: {}", socketPath_); + } else if (errno != ENOENT) { + spdlog::error("Failed to remove existing socket file: {}. Error: {}", + socketPath_, strerror(errno)); + throw std::runtime_error("Failed to remove existing socket file."); + } +} + +void PipeTransport::Connect() { + try { + asio::local::stream_protocol::endpoint endpoint(socketPath_); + socket_.connect(endpoint); + spdlog::info("Connected to socket at path: {}", socketPath_); + } catch (const std::exception &e) { + spdlog::error("Error connecting to socket: {}", e.what()); + throw std::runtime_error("Error connecting to socket"); + } +} + +void PipeTransport::BindAndListen() { + try { + asio::local::stream_protocol::acceptor acceptor( + ioContext_, asio::local::stream_protocol::endpoint(socketPath_)); + acceptor.listen(); + spdlog::info("Listening on socket path: {}", socketPath_); + acceptor.accept(socket_); + spdlog::info("Accepted connection on socket path: {}", socketPath_); + } catch (const std::exception &e) { + spdlog::error("Error binding/listening on socket: {}", e.what()); + throw std::runtime_error("Error binding/listening on socket"); + } +} + +void PipeTransport::SendMessage(const std::string &message) { + try { + std::string fullMessage = message + "\n"; + asio::write(socket_, asio::buffer(fullMessage)); + spdlog::debug("Sent message: {}", message); + } catch (const std::exception &e) { + spdlog::error("Error sending message: {}", e.what()); + throw std::runtime_error("Error sending message"); + } +} + +std::string PipeTransport::ReceiveMessage() { + try { + asio::streambuf buffer; + asio::read_until(socket_, buffer, '\n'); + std::istream is(&buffer); + std::string message; + std::getline(is, message); + spdlog::debug("Received message: {}", message); + return message; + } catch (const std::exception &e) { + spdlog::error("Error receiving message: {}", e.what()); + socket_.close(); + return ""; + } +} + +} // namespace transport +} // namespace jsonrpc diff --git a/src/transport/stdio_transport.cpp b/src/transport/stdio_transport.cpp index a12a855..82b2783 100644 --- a/src/transport/stdio_transport.cpp +++ b/src/transport/stdio_transport.cpp @@ -14,7 +14,9 @@ void StdioTransport::SendMessage(const std::string &message) { std::string StdioTransport::ReceiveMessage() { std::string response; - std::getline(std::cin, response); + if (!std::getline(std::cin, response)) { + throw std::runtime_error("Failed to receive message"); + } spdlog::debug("StdioTransport received response: {}", response); return response; } diff --git a/tests/BUILD.bazel b/tests/BUILD.bazel index 71a7ce9..283f218 100644 --- a/tests/BUILD.bazel +++ b/tests/BUILD.bazel @@ -81,3 +81,13 @@ cc_test( "@catch2//:catch2_main", ], ) + +cc_test( + name = "test_pipe_transport", + size = "small", + srcs = ["transports/test_pipe_transport.cpp"], + deps = [ + "//src:jsonrpc_lib", + "@catch2//:catch2_main", + ], +) diff --git a/tests/transports/test_framed_transport.cpp b/tests/transports/test_framed_transport.cpp index 36d709f..d7fe86c 100644 --- a/tests/transports/test_framed_transport.cpp +++ b/tests/transports/test_framed_transport.cpp @@ -1,4 +1,8 @@ +#include #include +#include +#include +#include #include #include @@ -7,79 +11,117 @@ using namespace jsonrpc::transport; -class MockFramedTransport : public FramedTransport { +namespace jsonrpc { +namespace transport { + +class FramedTransportTest : public FramedTransport { public: - using FramedTransport::DeframeMessage; using FramedTransport::FrameMessage; + using FramedTransport::ReadContent; + using FramedTransport::ReadContentLengthFromStream; + using FramedTransport::ReadHeadersFromStream; + using FramedTransport::ReceiveFramedMessage; + + int TestParseContentLength(const std::string &header_value) { + return ParseContentLength(header_value); + } }; -TEST_CASE("FramedTransport frames a message correctly", "[FramedTransport]") { +} // namespace transport +} // namespace jsonrpc + +TEST_CASE("FramedTransport correctly frames a message", "[FramedTransport]") { + FramedTransportTest transport; std::ostringstream output; - std::string message = R"({"jsonrpc": "2.0", "method": "example"})"; + std::string message = R"({"jsonrpc":"2.0","method":"testMethod"})"; - MockFramedTransport transport; transport.FrameMessage(output, message); - std::string expected_output = + std::string expectedOutput = "Content-Length: 39\r\n" "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n" "\r\n" + message; - REQUIRE(output.str() == expected_output); + REQUIRE(output.str() == expectedOutput); } -TEST_CASE("FramedTransport deframes a message correctly", "[FramedTransport]") { - std::string framed_message = - "Content-Length: 39\r\n" - "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n" - "\r\n" - R"({"jsonrpc": "2.0", "method": "example"})"; - std::istringstream input(framed_message); +TEST_CASE("FramedTransport parses headers correctly", "[FramedTransport]") { + FramedTransportTest transport; + std::string headerString = + "Content-Length: 37\r\nContent-Type: " + "application/vscode-jsonrpc; charset=utf-8\r\n\r\n"; + std::istringstream headerStream(headerString); + + auto headers = transport.ReadHeadersFromStream(headerStream); - MockFramedTransport transport; - std::string message = transport.DeframeMessage(input); + REQUIRE(headers.size() == 2); + REQUIRE(headers["Content-Length"] == "37"); + REQUIRE( + headers["Content-Type"] == "application/vscode-jsonrpc; charset=utf-8"); +} + +TEST_CASE( + "FramedTransport returns correct content length", "[FramedTransport]") { + FramedTransportTest transport; + std::string headerString = + "Content-Length: 37\r\nContent-Type: " + "application/vscode-jsonrpc; charset=utf-8\r\n\r\n"; + std::istringstream headerStream(headerString); + + int contentLength = transport.ReadContentLengthFromStream(headerStream); - REQUIRE(message == R"({"jsonrpc": "2.0", "method": "example"})"); + REQUIRE(contentLength == 37); } -TEST_CASE("FramedTransport throws when Content-Length header is missing", +TEST_CASE("FramedTransport reads correct content", "[FramedTransport]") { + FramedTransportTest transport; + std::string content = R"({"jsonrpc":"2.0","method":"testMethod"})"; + std::istringstream input(content); + + std::string result = transport.ReadContent(input, content.size()); + + REQUIRE(result == content); +} + +TEST_CASE( + "FramedTransport correctly receives framed message", "[FramedTransport]") { + FramedTransportTest transport; + std::string framedMessage = + "Content-Length: 39\r\nContent-Type: application/vscode-jsonrpc; " + "charset=utf-8\r\n\r\n" + R"({"jsonrpc":"2.0","method":"testMethod"})"; + std::istringstream input(framedMessage); + + std::string result = transport.ReceiveFramedMessage(input); + + REQUIRE(result == R"({"jsonrpc":"2.0","method":"testMethod"})"); +} + +TEST_CASE("FramedTransport throws error on invalid content length", "[FramedTransport]") { - std::string framed_message = - "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n" - "\r\n" - R"({"jsonrpc": "2.0", "method": "example"})"; - std::istringstream input(framed_message); + FramedTransportTest transport; + std::istringstream input("Content-Length: invalid\r\n\r\n"); - MockFramedTransport transport; - REQUIRE_THROWS_WITH( - transport.DeframeMessage(input), "Content-Length header missing"); + REQUIRE_THROWS_WITH(transport.ReadContentLengthFromStream(input), + "Invalid Content-Length value"); } -TEST_CASE("FramedTransport throws on invalid Content-Length value", +TEST_CASE("FramedTransport throws error on missing Content-Length", "[FramedTransport]") { - std::string framed_message = - "Content-Length: abc\r\n" - "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n" - "\r\n" - R"({"jsonrpc": "2.0", "method": "example"})"; - std::istringstream input(framed_message); + FramedTransportTest transport; + std::istringstream input( + "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n\r\n"); - MockFramedTransport transport; - REQUIRE_THROWS_WITH( - transport.DeframeMessage(input), "Invalid Content-Length value"); + REQUIRE_THROWS_WITH(transport.ReadContentLengthFromStream(input), + "Content-Length header missing"); } -TEST_CASE("FramedTransport throws when content read is incomplete", +TEST_CASE("FramedTransport throws error on out of range content length", "[FramedTransport]") { - std::string framed_message = - "Content-Length: 50\r\n" - "Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n" - "\r\n" - R"({"jsonrpc": "2.0", "method": "example"})"; - std::istringstream input(framed_message); + FramedTransportTest transport; + std::istringstream input("Content-Length: 9999999999999999999999\r\n\r\n"); - MockFramedTransport transport; - REQUIRE_THROWS_WITH(transport.DeframeMessage(input), - "Failed to read the expected content length"); + REQUIRE_THROWS_WITH(transport.ReadContentLengthFromStream(input), + "Content-Length value out of range"); } diff --git a/tests/transports/test_pipe_transport.cpp b/tests/transports/test_pipe_transport.cpp new file mode 100644 index 0000000..0ddaa7d --- /dev/null +++ b/tests/transports/test_pipe_transport.cpp @@ -0,0 +1,68 @@ +#include + +#include +#include + +#include "jsonrpc/transport/pipe_transport.hpp" + +TEST_CASE( + "PipeTransport starts server and client communication", "[PipeTransport]") { + std::string socketPath = "/tmp/test_socket"; + + // Start the server in a separate thread + std::thread serverThread([&]() { + jsonrpc::transport::PipeTransport serverTransport(socketPath, true); + + // Wait for a message from the client + std::string receivedMessage = serverTransport.ReceiveMessage(); + REQUIRE(receivedMessage == "Hello, Server!"); + + // Send a response back to the client + serverTransport.SendMessage("Hello, Client!"); + }); + + // Give the server some time to start + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Start the client and connect to the server + jsonrpc::transport::PipeTransport clientTransport(socketPath, false); + + // Send a message to the server + clientTransport.SendMessage("Hello, Server!"); + + // Wait for a response from the server + std::string response = clientTransport.ReceiveMessage(); + REQUIRE(response == "Hello, Client!"); + + serverThread.join(); +} + +TEST_CASE("PipeTransport handles empty message correctly", "[PipeTransport]") { + std::string socketPath = "/tmp/test_socket_empty"; + + // Start the server in a separate thread + std::thread serverThread([&]() { + jsonrpc::transport::PipeTransport serverTransport(socketPath, true); + + // Send an empty message to the client + serverTransport.SendMessage(""); + }); + + // Give the server some time to start + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Start the client and connect to the server + jsonrpc::transport::PipeTransport clientTransport(socketPath, false); + + // Wait for the empty response from the server + std::string response = clientTransport.ReceiveMessage(); + REQUIRE(response.empty()); + + serverThread.join(); +} + +TEST_CASE("PipeTransport throws on invalid socket path", "[PipeTransport]") { + REQUIRE_THROWS_WITH( + jsonrpc::transport::PipeTransport("/tmp/non_existent_socket", false), + "Error connecting to socket"); +}