diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 61b5e2e82e..bb45d6a07d 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -22,6 +22,7 @@ #include #include // NOLINT[build/include_order] +#include "rosidl_runtime_cpp/traits.hpp" #include "tracetools/tracetools.h" #include "tracetools/utils.hpp" @@ -29,6 +30,9 @@ #include "rclcpp/detail/subscription_callback_type_helper.hpp" #include "rclcpp/function_traits.hpp" #include "rclcpp/message_info.hpp" +#include "rclcpp/serialized_message.hpp" +#include "rclcpp/type_adapter.hpp" + template inline constexpr bool always_false_v = false; @@ -42,53 +46,189 @@ namespace detail template struct MessageDeleterHelper { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAlloc = typename MessageAllocTraits::allocator_type; - using MessageDeleter = allocator::Deleter; + using AllocTraits = allocator::AllocRebind; + using Alloc = typename AllocTraits::allocator_type; + using Deleter = allocator::Deleter; }; +/// Struct which contains all possible callback signatures, with or without a TypeAdapter.s template -struct AnySubscriptionCallbackHelper +struct AnySubscriptionCallbackPossibleTypes { - using MessageDeleter = typename MessageDeleterHelper::MessageDeleter; + /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. + using SubscribedType = typename rclcpp::TypeAdapter::custom_type; + /// MessageT::ros_message_type if MessageT is a TypeAdapter, otherwise just MessageT. + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + using SubscribedMessageDeleter = + typename MessageDeleterHelper::Deleter; + using ROSMessageDeleter = + typename MessageDeleterHelper::Deleter; + using SerializedMessageDeleter = + typename MessageDeleterHelper::Deleter; using ConstRefCallback = - std::function; + std::function; + using ConstRefROSMessageCallback = + std::function; using ConstRefWithInfoCallback = - std::function; + std::function; + using ConstRefWithInfoROSMessageCallback = + std::function; + using ConstRefSerializedMessageCallback = + std::function; + using ConstRefSerializedMessageWithInfoCallback = + std::function; using UniquePtrCallback = - std::function)>; + std::function)>; + using UniquePtrROSMessageCallback = + std::function)>; using UniquePtrWithInfoCallback = - std::function, const rclcpp::MessageInfo &)>; + std::function, + const rclcpp::MessageInfo &)>; + using UniquePtrWithInfoROSMessageCallback = + std::function, + const rclcpp::MessageInfo &)>; + using UniquePtrSerializedMessageCallback = + std::function)>; + using UniquePtrSerializedMessageWithInfoCallback = + std::function, + const rclcpp::MessageInfo &)>; using SharedConstPtrCallback = - std::function)>; + std::function)>; + using SharedConstPtrROSMessageCallback = + std::function)>; using SharedConstPtrWithInfoCallback = - std::function, const rclcpp::MessageInfo &)>; + std::function, + const rclcpp::MessageInfo &)>; + using SharedConstPtrWithInfoROSMessageCallback = + std::function, + const rclcpp::MessageInfo &)>; + using SharedConstPtrSerializedMessageCallback = + std::function)>; + using SharedConstPtrSerializedMessageWithInfoCallback = + std::function, + const rclcpp::MessageInfo &)>; using ConstRefSharedConstPtrCallback = - std::function &)>; + std::function &)>; + using ConstRefSharedConstPtrROSMessageCallback = + std::function &)>; using ConstRefSharedConstPtrWithInfoCallback = - std::function &, const rclcpp::MessageInfo &)>; + std::function &, + const rclcpp::MessageInfo &)>; + using ConstRefSharedConstPtrWithInfoROSMessageCallback = + std::function &, + const rclcpp::MessageInfo &)>; + using ConstRefSharedConstPtrSerializedMessageCallback = + std::function &)>; + using ConstRefSharedConstPtrSerializedMessageWithInfoCallback = + std::function &, + const rclcpp::MessageInfo &)>; // Deprecated signatures: using SharedPtrCallback = - std::function)>; + std::function)>; + using SharedPtrROSMessageCallback = + std::function)>; using SharedPtrWithInfoCallback = - std::function, const rclcpp::MessageInfo &)>; + std::function, const rclcpp::MessageInfo &)>; + using SharedPtrWithInfoROSMessageCallback = + std::function, + const rclcpp::MessageInfo &)>; + using SharedPtrSerializedMessageCallback = + std::function)>; + using SharedPtrSerializedMessageWithInfoCallback = + std::function, const rclcpp::MessageInfo &)>; +}; + +/// Template helper to select the variant type based on whether or not MessageT is a TypeAdapter. +template< + typename MessageT, + typename AllocatorT, + bool is_adapted_type = rclcpp::TypeAdapter::is_specialized::value +> +struct AnySubscriptionCallbackHelper; + +/// Specialization for when MessageT is not a TypeAdapter. +template +struct AnySubscriptionCallbackHelper +{ + using CallbackTypes = AnySubscriptionCallbackPossibleTypes; using variant_type = std::variant< - ConstRefCallback, - ConstRefWithInfoCallback, - UniquePtrCallback, - UniquePtrWithInfoCallback, - SharedConstPtrCallback, - SharedConstPtrWithInfoCallback, - ConstRefSharedConstPtrCallback, - ConstRefSharedConstPtrWithInfoCallback, - SharedPtrCallback, - SharedPtrWithInfoCallback + typename CallbackTypes::ConstRefCallback, + typename CallbackTypes::ConstRefWithInfoCallback, + typename CallbackTypes::ConstRefSerializedMessageCallback, + typename CallbackTypes::ConstRefSerializedMessageWithInfoCallback, + typename CallbackTypes::UniquePtrCallback, + typename CallbackTypes::UniquePtrWithInfoCallback, + typename CallbackTypes::UniquePtrSerializedMessageCallback, + typename CallbackTypes::UniquePtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedConstPtrCallback, + typename CallbackTypes::SharedConstPtrWithInfoCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::ConstRefSharedConstPtrCallback, + typename CallbackTypes::ConstRefSharedConstPtrWithInfoCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedPtrCallback, + typename CallbackTypes::SharedPtrWithInfoCallback, + typename CallbackTypes::SharedPtrSerializedMessageCallback, + typename CallbackTypes::SharedPtrSerializedMessageWithInfoCallback + >; +}; + +/// Specialization for when MessageT is a TypeAdapter. +template +struct AnySubscriptionCallbackHelper +{ + using CallbackTypes = AnySubscriptionCallbackPossibleTypes; + + using variant_type = std::variant< + typename CallbackTypes::ConstRefCallback, + typename CallbackTypes::ConstRefROSMessageCallback, + typename CallbackTypes::ConstRefWithInfoCallback, + typename CallbackTypes::ConstRefWithInfoROSMessageCallback, + typename CallbackTypes::ConstRefSerializedMessageCallback, + typename CallbackTypes::ConstRefSerializedMessageWithInfoCallback, + typename CallbackTypes::UniquePtrCallback, + typename CallbackTypes::UniquePtrROSMessageCallback, + typename CallbackTypes::UniquePtrWithInfoCallback, + typename CallbackTypes::UniquePtrWithInfoROSMessageCallback, + typename CallbackTypes::UniquePtrSerializedMessageCallback, + typename CallbackTypes::UniquePtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedConstPtrCallback, + typename CallbackTypes::SharedConstPtrROSMessageCallback, + typename CallbackTypes::SharedConstPtrWithInfoCallback, + typename CallbackTypes::SharedConstPtrWithInfoROSMessageCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::ConstRefSharedConstPtrCallback, + typename CallbackTypes::ConstRefSharedConstPtrROSMessageCallback, + typename CallbackTypes::ConstRefSharedConstPtrWithInfoCallback, + typename CallbackTypes::ConstRefSharedConstPtrWithInfoROSMessageCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedPtrCallback, + typename CallbackTypes::SharedPtrROSMessageCallback, + typename CallbackTypes::SharedPtrWithInfoCallback, + typename CallbackTypes::SharedPtrWithInfoROSMessageCallback, + typename CallbackTypes::SharedPtrSerializedMessageCallback, + typename CallbackTypes::SharedPtrSerializedMessageWithInfoCallback >; }; @@ -101,49 +241,124 @@ template< class AnySubscriptionCallback { private: + /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. + using SubscribedType = typename rclcpp::TypeAdapter::custom_type; + /// MessageT::ros_message_type if MessageT is a TypeAdapter, otherwise just MessageT. + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + using HelperT = typename rclcpp::detail::AnySubscriptionCallbackHelper; - using MessageDeleterHelper = rclcpp::detail::MessageDeleterHelper; - using MessageAllocTraits = typename MessageDeleterHelper::MessageAllocTraits; - using MessageAlloc = typename MessageDeleterHelper::MessageAlloc; - using MessageDeleter = typename MessageDeleterHelper::MessageDeleter; + using SubscribedTypeDeleterHelper = + rclcpp::detail::MessageDeleterHelper; + using SubscribedTypeAllocatorTraits = typename SubscribedTypeDeleterHelper::AllocTraits; + using SubscribedTypeAllocator = typename SubscribedTypeDeleterHelper::Alloc; + using SubscribedTypeDeleter = typename SubscribedTypeDeleterHelper::Deleter; - // See AnySubscriptionCallbackHelper for the types of these. - using ConstRefCallback = typename HelperT::ConstRefCallback; - using ConstRefWithInfoCallback = typename HelperT::ConstRefWithInfoCallback; + using ROSMessageTypeDeleterHelper = + rclcpp::detail::MessageDeleterHelper; + using ROSMessageTypeAllocatorTraits = typename ROSMessageTypeDeleterHelper::AllocTraits; + using ROSMessageTypeAllocator = typename ROSMessageTypeDeleterHelper::Alloc; + using ROSMessageTypeDeleter = typename ROSMessageTypeDeleterHelper::Deleter; - using UniquePtrCallback = typename HelperT::UniquePtrCallback; - using UniquePtrWithInfoCallback = typename HelperT::UniquePtrWithInfoCallback; + using SerializedMessageDeleterHelper = + rclcpp::detail::MessageDeleterHelper; + using SerializedMessageAllocatorTraits = typename SerializedMessageDeleterHelper::AllocTraits; + using SerializedMessageAllocator = typename SerializedMessageDeleterHelper::Alloc; + using SerializedMessageDeleter = typename SerializedMessageDeleterHelper::Deleter; - using SharedConstPtrCallback = typename HelperT::SharedConstPtrCallback; - using SharedConstPtrWithInfoCallback = typename HelperT::SharedConstPtrWithInfoCallback; + // See AnySubscriptionCallbackPossibleTypes for the types of these. + using CallbackTypes = detail::AnySubscriptionCallbackPossibleTypes; + using ConstRefCallback = + typename CallbackTypes::ConstRefCallback; + using ConstRefROSMessageCallback = + typename CallbackTypes::ConstRefROSMessageCallback; + using ConstRefWithInfoCallback = + typename CallbackTypes::ConstRefWithInfoCallback; + using ConstRefWithInfoROSMessageCallback = + typename CallbackTypes::ConstRefWithInfoROSMessageCallback; + using ConstRefSerializedMessageCallback = + typename CallbackTypes::ConstRefSerializedMessageCallback; + using ConstRefSerializedMessageWithInfoCallback = + typename CallbackTypes::ConstRefSerializedMessageWithInfoCallback; + using UniquePtrCallback = + typename CallbackTypes::UniquePtrCallback; + using UniquePtrROSMessageCallback = + typename CallbackTypes::UniquePtrROSMessageCallback; + using UniquePtrWithInfoCallback = + typename CallbackTypes::UniquePtrWithInfoCallback; + using UniquePtrWithInfoROSMessageCallback = + typename CallbackTypes::UniquePtrWithInfoROSMessageCallback; + using UniquePtrSerializedMessageCallback = + typename CallbackTypes::UniquePtrSerializedMessageCallback; + using UniquePtrSerializedMessageWithInfoCallback = + typename CallbackTypes::UniquePtrSerializedMessageWithInfoCallback; + using SharedConstPtrCallback = + typename CallbackTypes::SharedConstPtrCallback; + using SharedConstPtrROSMessageCallback = + typename CallbackTypes::SharedConstPtrROSMessageCallback; + using SharedConstPtrWithInfoCallback = + typename CallbackTypes::SharedConstPtrWithInfoCallback; + using SharedConstPtrWithInfoROSMessageCallback = + typename CallbackTypes::SharedConstPtrWithInfoROSMessageCallback; + using SharedConstPtrSerializedMessageCallback = + typename CallbackTypes::SharedConstPtrSerializedMessageCallback; + using SharedConstPtrSerializedMessageWithInfoCallback = + typename CallbackTypes::SharedConstPtrSerializedMessageWithInfoCallback; using ConstRefSharedConstPtrCallback = - typename HelperT::ConstRefSharedConstPtrCallback; + typename CallbackTypes::ConstRefSharedConstPtrCallback; + using ConstRefSharedConstPtrROSMessageCallback = + typename CallbackTypes::ConstRefSharedConstPtrROSMessageCallback; using ConstRefSharedConstPtrWithInfoCallback = - typename HelperT::ConstRefSharedConstPtrWithInfoCallback; + typename CallbackTypes::ConstRefSharedConstPtrWithInfoCallback; + using ConstRefSharedConstPtrWithInfoROSMessageCallback = + typename CallbackTypes::ConstRefSharedConstPtrWithInfoROSMessageCallback; + using ConstRefSharedConstPtrSerializedMessageCallback = + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageCallback; + using ConstRefSharedConstPtrSerializedMessageWithInfoCallback = + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageWithInfoCallback; + using SharedPtrCallback = + typename CallbackTypes::SharedPtrCallback; + using SharedPtrROSMessageCallback = + typename CallbackTypes::SharedPtrROSMessageCallback; + using SharedPtrWithInfoCallback = + typename CallbackTypes::SharedPtrWithInfoCallback; + using SharedPtrWithInfoROSMessageCallback = + typename CallbackTypes::SharedPtrWithInfoROSMessageCallback; + using SharedPtrSerializedMessageCallback = + typename CallbackTypes::SharedPtrSerializedMessageCallback; + using SharedPtrSerializedMessageWithInfoCallback = + typename CallbackTypes::SharedPtrSerializedMessageWithInfoCallback; + + template + struct NotNull + { + NotNull(const T * pointer_in, const char * msg) + : pointer(pointer_in) + { + if (pointer == nullptr) { + throw std::invalid_argument(msg); + } + } - using SharedPtrCallback = typename HelperT::SharedPtrCallback; - using SharedPtrWithInfoCallback = typename HelperT::SharedPtrWithInfoCallback; + const T * pointer; + }; public: explicit AnySubscriptionCallback(const AllocatorT & allocator = AllocatorT()) // NOLINT[runtime/explicit] + : subscribed_type_allocator_(allocator), + ros_message_type_allocator_(allocator) { - message_allocator_ = allocator; - allocator::set_allocator_for_deleter(&message_deleter_, &message_allocator_); + allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_); + allocator::set_allocator_for_deleter(&ros_message_type_deleter_, &ros_message_type_allocator_); } [[deprecated("use AnySubscriptionCallback(const AllocatorT & allocator) instead")]] explicit AnySubscriptionCallback(std::shared_ptr allocator) // NOLINT[runtime/explicit] - { - if (allocator == nullptr) { - throw std::runtime_error("invalid allocator"); - } - message_allocator_ = *allocator; - allocator::set_allocator_for_deleter(&message_deleter_, &message_allocator_); - } + : AnySubscriptionCallback(*NotNull(allocator.get(), "invalid allocator").pointer) + {} AnySubscriptionCallback(const AnySubscriptionCallback &) = default; @@ -190,40 +405,79 @@ class AnySubscriptionCallback } /// Function for shared_ptr to non-const MessageT, which is deprecated. + template // TODO(wjwwood): enable this deprecation after Galactic // [[deprecated( // "use 'void (std::shared_ptr)' instead" // )]] void - set_deprecated(std::function)> callback) - // set(CallbackT callback) + set_deprecated(std::function)> callback) { callback_variant_ = callback; } /// Function for shared_ptr to non-const MessageT with MessageInfo, which is deprecated. + template // TODO(wjwwood): enable this deprecation after Galactic // [[deprecated( // "use 'void (std::shared_ptr, const rclcpp::MessageInfo &)' instead" // )]] void - set_deprecated( - std::function, const rclcpp::MessageInfo &)> callback) + set_deprecated(std::function, const rclcpp::MessageInfo &)> callback) { callback_variant_ = callback; } - std::unique_ptr - create_unique_ptr_from_shared_ptr_message(const std::shared_ptr & message) + std::unique_ptr + create_ros_unique_ptr_from_ros_shared_ptr_message( + const std::shared_ptr & message) { - auto ptr = MessageAllocTraits::allocate(message_allocator_, 1); - MessageAllocTraits::construct(message_allocator_, ptr, *message); - return std::unique_ptr(ptr, message_deleter_); + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr, *message); + return std::unique_ptr(ptr, ros_message_type_deleter_); } + std::unique_ptr + create_serialized_message_unique_ptr_from_shared_ptr( + const std::shared_ptr & serialized_message) + { + auto ptr = SerializedMessageAllocatorTraits::allocate(serialized_message_allocator_, 1); + SerializedMessageAllocatorTraits::construct( + serialized_message_allocator_, ptr, *serialized_message); + return std::unique_ptr< + rclcpp::SerializedMessage, + SerializedMessageDeleter + >(ptr, serialized_message_deleter_); + } + + std::unique_ptr + create_custom_unique_ptr_from_custom_shared_ptr_message( + const std::shared_ptr & message) + { + auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); + SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr, *message); + return std::unique_ptr(ptr, subscribed_type_deleter_); + } + + std::unique_ptr + convert_ros_message_to_custom_type_unique_ptr(const ROSMessageType & msg) + { + if constexpr (rclcpp::TypeAdapter::is_specialized::value) { + auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); + SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr); + rclcpp::TypeAdapter::convert_to_custom(msg, *ptr); + return std::unique_ptr(ptr, subscribed_type_deleter_); + } else { + throw std::runtime_error( + "convert_ros_message_to_custom_type_unique_ptr " + "unexpectedly called without TypeAdapter"); + } + } + + // Dispatch when input is a ros message and the output could be anything. void dispatch( - std::shared_ptr message, + std::shared_ptr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), false); @@ -238,28 +492,162 @@ class AnySubscriptionCallback std::visit( [&message, &message_info, this](auto && callback) { using T = std::decay_t; - - if constexpr (std::is_same_v) { + static constexpr bool is_ta = rclcpp::TypeAdapter::is_specialized::value; + + // conditions for output is custom message + if constexpr (is_ta && std::is_same_v) { + // TODO(wjwwood): consider avoiding heap allocation for small messages + // maybe something like: + // if constexpr (rosidl_generator_traits::has_fixed_size && sizeof(T) < N) { + // ... on stack + // } + auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message); + callback(*local_message); + } else if constexpr (is_ta && std::is_same_v) { // NOLINT + auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message); + callback(*local_message, message_info); + } else if constexpr (is_ta && std::is_same_v) { + callback(convert_ros_message_to_custom_type_unique_ptr(*message)); + } else if constexpr (is_ta && std::is_same_v) { + callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message)); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info); + } + // conditions for output is ros message + else if constexpr (std::is_same_v) { // NOLINT callback(*message); - } else if constexpr (std::is_same_v) { + } else if constexpr (std::is_same_v) { callback(*message, message_info); - } else if constexpr (std::is_same_v) { - callback(create_unique_ptr_from_shared_ptr_message(message)); - } else if constexpr (std::is_same_v) { - callback(create_unique_ptr_from_shared_ptr_message(message), message_info); + } else if constexpr (std::is_same_v) { + callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message)); + } else if constexpr (std::is_same_v) { + callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message), message_info); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v|| + std::is_same_v) { callback(message); } else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v|| + std::is_same_v) + { + callback(message, message_info); + } + // condition to catch SerializedMessage types + else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v) + { + throw std::runtime_error( + "Cannot dispatch std::shared_ptr message " + "to rclcpp::SerializedMessage"); + } + // condition to catch unhandled callback types + else { // NOLINT[readability/braces] + static_assert(always_false_v, "unhandled callback type"); + } + }, callback_variant_); + TRACEPOINT(callback_end, static_cast(this)); + } + + // Dispatch when input is a serialized message and the output could be anything. + void + dispatch( + std::shared_ptr serialized_message, + const rclcpp::MessageInfo & message_info) + { + TRACEPOINT(callback_start, static_cast(this), false); + // Check if the variant is "unset", throw if it is. + if (callback_variant_.index() == 0) { + if (std::get<0>(callback_variant_) == nullptr) { + // This can happen if it is default initialized, or if it is assigned nullptr. + throw std::runtime_error("dispatch called on an unset AnySubscriptionCallback"); + } + } + // Dispatch. + std::visit( + [&serialized_message, &message_info, this](auto && callback) { + using T = std::decay_t; + + // condition to catch SerializedMessage types + if constexpr (std::is_same_v) { + callback(*serialized_message); + } else if constexpr (std::is_same_v) { + callback(*serialized_message, message_info); + } else if constexpr (std::is_same_v) { + callback(create_serialized_message_unique_ptr_from_shared_ptr(serialized_message)); + } else if constexpr (std::is_same_v) { + callback( + create_serialized_message_unique_ptr_from_shared_ptr(serialized_message), + message_info); + } else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v|| + std::is_same_v) + { + callback(create_serialized_message_unique_ptr_from_shared_ptr(serialized_message)); + } else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v|| + std::is_same_v) + { + callback( + create_serialized_message_unique_ptr_from_shared_ptr(serialized_message), + message_info); + } + // conditions for output anything else + else if constexpr ( // NOLINT[whitespace/newline] + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v) { - callback(message, message_info); - } else { + throw std::runtime_error( + "cannot dispatch rclcpp::SerializedMessage to " + "non-rclcpp::SerializedMessage callbacks"); + } + // condition to catch unhandled callback types + else { // NOLINT[readability/braces] static_assert(always_false_v, "unhandled callback type"); } }, callback_variant_); @@ -268,7 +656,7 @@ class AnySubscriptionCallback void dispatch_intra_process( - std::shared_ptr message, + std::shared_ptr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), true); @@ -283,32 +671,89 @@ class AnySubscriptionCallback std::visit( [&message, &message_info, this](auto && callback) { using T = std::decay_t; - - if constexpr (std::is_same_v) { + static constexpr bool is_ta = rclcpp::TypeAdapter::is_specialized::value; + + // conditions for custom type + if constexpr (is_ta && std::is_same_v) { + auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message); + callback(*local_message); + } else if constexpr (is_ta && std::is_same_v) { // NOLINT + auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message); + callback(*local_message, message_info); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message)); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message)); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info); + } + // conditions for ros message type + else if constexpr (std::is_same_v) { // NOLINT callback(*message); - } else if constexpr (std::is_same_v) { + } else if constexpr (std::is_same_v) { callback(*message, message_info); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v) { - callback(create_unique_ptr_from_shared_ptr_message(message)); + callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message)); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v) { - callback(create_unique_ptr_from_shared_ptr_message(message), message_info); + callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message), message_info); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v) { callback(message); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v) { callback(message, message_info); - } else { + } + // condition to catch SerializedMessage types + else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v) + { + throw std::runtime_error( + "Cannot dispatch std::shared_ptr message " + "to rclcpp::SerializedMessage"); + } + // condition to catch unhandled callback types + else { // NOLINT[readability/braces] static_assert(always_false_v, "unhandled callback type"); } }, callback_variant_); @@ -317,7 +762,7 @@ class AnySubscriptionCallback void dispatch_intra_process( - std::unique_ptr message, + std::unique_ptr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), true); @@ -330,28 +775,91 @@ class AnySubscriptionCallback } // Dispatch. std::visit( - [&message, &message_info](auto && callback) { + [&message, &message_info, this](auto && callback) { using T = std::decay_t; - - if constexpr (std::is_same_v) { + static constexpr bool is_ta = rclcpp::TypeAdapter::is_specialized::value; + + // conditions for custom type + if constexpr (is_ta && std::is_same_v) { + auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message); + callback(*local_message); + } else if constexpr (is_ta && std::is_same_v) { // NOLINT + auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message); + callback(*local_message, message_info); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message)); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message)); + } else if constexpr ( // NOLINT[readability/braces] + is_ta && ( + std::is_same_v|| + std::is_same_v + )) + { + callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info); + } + // conditions for ros message type + else if constexpr (std::is_same_v) { // NOLINT callback(*message); - } else if constexpr (std::is_same_v) { + } else if constexpr (std::is_same_v) { callback(*message, message_info); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v|| - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v) { callback(std::move(message)); } else if constexpr ( // NOLINT[readability/braces] - std::is_same_v|| - std::is_same_v|| - std::is_same_v|| - std::is_same_v) + std::is_same_v|| + std::is_same_v) { callback(std::move(message), message_info); - } else { + } else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v) + { + callback(std::move(message)); + } else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v) + { + callback(std::move(message), message_info); + } + // condition to catch SerializedMessage types + else if constexpr ( // NOLINT[readability/braces] + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v|| + std::is_same_v) + { + throw std::runtime_error( + "Cannot dispatch std::unique_ptr message " + "to rclcpp::SerializedMessage"); + } + // condition to catch unhandled callback types + else { // NOLINT[readability/braces] static_assert(always_false_v, "unhandled callback type"); } }, callback_variant_); @@ -369,6 +877,18 @@ class AnySubscriptionCallback std::holds_alternative(callback_variant_); } + constexpr + bool + is_serialized_message_callback() const + { + return + std::holds_alternative(callback_variant_) || + std::holds_alternative(callback_variant_) || + std::holds_alternative(callback_variant_) || + std::holds_alternative(callback_variant_) || + std::holds_alternative(callback_variant_); + } + void register_callback_for_tracing() { @@ -402,8 +922,12 @@ class AnySubscriptionCallback // For now, compose the variant into this class as a private attribute. typename HelperT::variant_type callback_variant_; - MessageAlloc message_allocator_; - MessageDeleter message_deleter_; + SubscribedTypeAllocator subscribed_type_allocator_; + SubscribedTypeDeleter subscribed_type_deleter_; + ROSMessageTypeAllocator ros_message_type_allocator_; + ROSMessageTypeDeleter ros_message_type_deleter_; + SerializedMessageAllocator serialized_message_allocator_; + SerializedMessageDeleter serialized_message_deleter_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index cf2458de62..5b84930ff7 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -47,11 +47,11 @@ template< typename MessageT, typename CallbackT, typename AllocatorT, - typename CallbackMessageT, typename SubscriptionT, typename MessageMemoryStrategyT, typename NodeParametersT, - typename NodeTopicsT> + typename NodeTopicsT, + typename ROSMessageType = typename SubscriptionT::ROSMessageType> typename std::shared_ptr create_subscription( NodeParametersT & node_parameters, @@ -70,7 +70,7 @@ create_subscription( using rclcpp::node_interfaces::get_node_topics_interface; auto node_topics_interface = get_node_topics_interface(node_topics); - std::shared_ptr> + std::shared_ptr> subscription_topic_stats = nullptr; if (rclcpp::detail::resolve_enable_topic_statistics( @@ -92,11 +92,11 @@ create_subscription( qos); subscription_topic_stats = std::make_shared< - rclcpp::topic_statistics::SubscriptionTopicStatistics + rclcpp::topic_statistics::SubscriptionTopicStatistics >(node_topics_interface->get_node_base_interface()->get_name(), publisher); std::weak_ptr< - rclcpp::topic_statistics::SubscriptionTopicStatistics + rclcpp::topic_statistics::SubscriptionTopicStatistics > weak_subscription_topic_stats(subscription_topic_stats); auto sub_call_back = [weak_subscription_topic_stats]() { auto subscription_topic_stats = weak_subscription_topic_stats.lock(); @@ -153,7 +153,6 @@ create_subscription( * \tparam MessageT * \tparam CallbackT * \tparam AllocatorT - * \tparam CallbackMessageT * \tparam SubscriptionT * \tparam MessageMemoryStrategyT * \tparam NodeT @@ -171,13 +170,8 @@ template< typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, - typename CallbackMessageT = - typename rclcpp::subscription_traits::has_message_type::type, - typename SubscriptionT = rclcpp::Subscription, - typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, - AllocatorT - >, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType, typename NodeT> typename std::shared_ptr create_subscription( @@ -194,7 +188,7 @@ create_subscription( ) { return rclcpp::detail::create_subscription< - MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT>( + MessageT, CallbackT, AllocatorT, SubscriptionT, MessageMemoryStrategyT>( node, node, topic_name, qos, std::forward(callback), options, msg_mem_strat); } @@ -206,13 +200,8 @@ template< typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, - typename CallbackMessageT = - typename rclcpp::subscription_traits::has_message_type::type, - typename SubscriptionT = rclcpp::Subscription, - typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, - AllocatorT - >> + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType> typename std::shared_ptr create_subscription( rclcpp::node_interfaces::NodeParametersInterface::SharedPtr & node_parameters, @@ -229,7 +218,7 @@ create_subscription( ) { return rclcpp::detail::create_subscription< - MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT>( + MessageT, CallbackT, AllocatorT, SubscriptionT, MessageMemoryStrategyT>( node_parameters, node_topics, topic_name, qos, std::forward(callback), options, msg_mem_strat); } diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index f3990cb76b..4d968ba7d8 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -33,6 +33,7 @@ #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/logger.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" @@ -181,7 +182,7 @@ class IntraProcessManager do_intra_process_publish( uint64_t intra_process_publisher_id, std::unique_ptr message, - std::shared_ptr::allocator_type> allocator) + typename allocator::AllocRebind::allocator_type & allocator) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -226,7 +227,7 @@ class IntraProcessManager { // Construct a new shared pointer from the message // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared(*allocator, *message); + auto shared_msg = std::allocate_shared(allocator, *message); this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); @@ -243,7 +244,7 @@ class IntraProcessManager do_intra_process_publish_and_return_shared( uint64_t intra_process_publisher_id, std::unique_ptr message, - std::shared_ptr::allocator_type> allocator) + typename allocator::AllocRebind::allocator_type & allocator) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -271,7 +272,7 @@ class IntraProcessManager } else { // Construct a new shared pointer from the message for the buffers that // do not require ownership and to return. - auto shared_msg = std::allocate_shared(*allocator, *message); + auto shared_msg = std::allocate_shared(allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( @@ -368,12 +369,12 @@ class IntraProcessManager auto subscription_base = subscription_it->second.subscription.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess + rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (nullptr == subscription) { throw std::runtime_error( "failed to dynamic cast SubscriptionIntraProcessBase to " - "SubscriptionIntraProcess, which " + "SubscriptionIntraProcessBuffer, which " "can happen when the publisher and subscription use different " "allocator types, which is not supported"); } @@ -393,7 +394,7 @@ class IntraProcessManager add_owned_msg_to_buffers( std::unique_ptr message, std::vector subscription_ids, - std::shared_ptr::allocator_type> allocator) + typename allocator::AllocRebind::allocator_type & allocator) { using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; @@ -406,12 +407,12 @@ class IntraProcessManager auto subscription_base = subscription_it->second.subscription.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess + rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (nullptr == subscription) { throw std::runtime_error( "failed to dynamic cast SubscriptionIntraProcessBase to " - "SubscriptionIntraProcess, which " + "SubscriptionIntraProcessBuffer, which " "can happen when the publisher and subscription use different " "allocator types, which is not supported"); } @@ -423,8 +424,8 @@ class IntraProcessManager // Copy the message since we have additional subscriptions to serve MessageUniquePtr copy_message; Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); - MessageAllocTraits::construct(*allocator.get(), ptr, *message); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); copy_message = MessageUniquePtr(ptr, deleter); subscription->provide_intra_process_message(std::move(copy_message)); diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 46e019daee..92ee8fde78 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -30,6 +30,7 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -44,21 +45,27 @@ template< typename Alloc = std::allocator, typename Deleter = std::default_delete, typename CallbackMessageT = MessageT> -class SubscriptionIntraProcess : public SubscriptionIntraProcessBase +class SubscriptionIntraProcess + : public SubscriptionIntraProcessBuffer< + MessageT, + Alloc, + Deleter + > { -public: - RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) - - using MessageAllocTraits = allocator::AllocRebind; - using MessageAlloc = typename MessageAllocTraits::allocator_type; - using ConstMessageSharedPtr = std::shared_ptr; - using MessageUniquePtr = std::unique_ptr; - - using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< + using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer< MessageT, Alloc, Deleter - >::UniquePtr; + >; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) + + using MessageAllocTraits = typename SubscriptionIntraProcessBufferT::MessageAllocTraits; + using MessageAlloc = typename SubscriptionIntraProcessBufferT::MessageAlloc; + using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstMessageSharedPtr; + using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::MessageUniquePtr; + using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; SubscriptionIntraProcess( AnySubscriptionCallback callback, @@ -67,31 +74,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase const std::string & topic_name, rmw_qos_profile_t qos_profile, rclcpp::IntraProcessBufferType buffer_type) - : SubscriptionIntraProcessBase(topic_name, qos_profile), + : SubscriptionIntraProcessBuffer( + allocator, + context, + topic_name, + qos_profile, + buffer_type), any_callback_(callback) { - if (!std::is_same::value) { - throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); - } - - // Create the intra-process buffer. - buffer_ = rclcpp::experimental::create_intra_process_buffer( - buffer_type, - qos_profile, - allocator); - - // Create the guard condition. - rcl_guard_condition_options_t guard_condition_options = - rcl_guard_condition_get_default_options(); - - gc_ = rcl_get_zero_initialized_guard_condition(); - rcl_ret_t ret = rcl_guard_condition_init( - &gc_, context->get_rcl_context().get(), guard_condition_options); - - if (RCL_RET_OK != ret) { - throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition"); - } - TRACEPOINT( rclcpp_subscription_callback_added, static_cast(this), @@ -104,22 +94,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase #endif } - ~SubscriptionIntraProcess() - { - if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Failed to destroy guard condition: %s", - rcutils_get_error_string().str); - } - } - - bool - is_ready(rcl_wait_set_t * wait_set) - { - (void) wait_set; - return buffer_->has_data(); - } + virtual ~SubscriptionIntraProcess() = default; std::shared_ptr take_data() @@ -128,9 +103,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase MessageUniquePtr unique_msg; if (any_callback_.use_take_shared_method()) { - shared_msg = buffer_->consume_shared(); + shared_msg = this->buffer_->consume_shared(); } else { - unique_msg = buffer_->consume_unique(); + unique_msg = this->buffer_->consume_unique(); } return std::static_pointer_cast( std::make_shared>( @@ -141,37 +116,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase void execute(std::shared_ptr & data) { - execute_impl(data); - } - - void - provide_intra_process_message(ConstMessageSharedPtr message) - { - buffer_->add_shared(std::move(message)); - trigger_guard_condition(); - } - - void - provide_intra_process_message(MessageUniquePtr message) - { - buffer_->add_unique(std::move(message)); - trigger_guard_condition(); - } - - bool - use_take_shared_method() const - { - return buffer_->use_take_shared_method(); - } - -private: - void - trigger_guard_condition() - { - rcl_ret_t ret = rcl_trigger_guard_condition(&gc_); - (void)ret; + execute_impl(data); } +protected: template typename std::enable_if::value, void>::type execute_impl(std::shared_ptr & data) @@ -206,7 +154,6 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } AnySubscriptionCallback any_callback_; - BufferUniquePtr buffer_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp new file mode 100644 index 0000000000..b58c2853ca --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -0,0 +1,142 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ +#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ + +#include + +#include +#include +#include +#include +#include +#include + +#include "rcl/error_handling.h" + +#include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" +#include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/waitable.hpp" +#include "tracetools/tracetools.h" + +namespace rclcpp +{ +namespace experimental +{ + +template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete +> +class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer) + + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using ConstMessageSharedPtr = std::shared_ptr; + using MessageUniquePtr = std::unique_ptr; + + using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< + MessageT, + Alloc, + Deleter + >::UniquePtr; + + SubscriptionIntraProcessBuffer( + std::shared_ptr allocator, + rclcpp::Context::SharedPtr context, + const std::string & topic_name, + rmw_qos_profile_t qos_profile, + rclcpp::IntraProcessBufferType buffer_type) + : SubscriptionIntraProcessBase(topic_name, qos_profile) + { + // Create the intra-process buffer. + buffer_ = rclcpp::experimental::create_intra_process_buffer( + buffer_type, + qos_profile, + allocator); + + // Create the guard condition. + rcl_guard_condition_options_t guard_condition_options = + rcl_guard_condition_get_default_options(); + + gc_ = rcl_get_zero_initialized_guard_condition(); + rcl_ret_t ret = rcl_guard_condition_init( + &gc_, context->get_rcl_context().get(), guard_condition_options); + + if (RCL_RET_OK != ret) { + throw std::runtime_error( + "SubscriptionIntraProcessBuffer init error initializing guard condition"); + } + } + + virtual ~SubscriptionIntraProcessBuffer() + { + if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Failed to destroy guard condition: %s", + rcutils_get_error_string().str); + } + } + + bool + is_ready(rcl_wait_set_t * wait_set) + { + (void) wait_set; + return buffer_->has_data(); + } + + void + provide_intra_process_message(ConstMessageSharedPtr message) + { + buffer_->add_shared(std::move(message)); + trigger_guard_condition(); + } + + void + provide_intra_process_message(MessageUniquePtr message) + { + buffer_->add_unique(std::move(message)); + trigger_guard_condition(); + } + + bool + use_take_shared_method() const + { + return buffer_->use_take_shared_method(); + } + +protected: + void + trigger_guard_condition() + { + rcl_ret_t ret = rcl_trigger_guard_condition(&gc_); + (void)ret; + } + + BufferUniquePtr buffer_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ diff --git a/rclcpp/include/rclcpp/generic_subscription.hpp b/rclcpp/include/rclcpp/generic_subscription.hpp index d9f40d868b..673712eedb 100644 --- a/rclcpp/include/rclcpp/generic_subscription.hpp +++ b/rclcpp/include/rclcpp/generic_subscription.hpp @@ -136,6 +136,13 @@ class GenericSubscription : public rclcpp::SubscriptionBase void handle_message( std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override; + /// Handle dispatching rclcpp::SerializedMessage to user callback. + RCLCPP_PUBLIC + void + handle_serialized_message( + const std::shared_ptr & serialized_message, + const rclcpp::MessageInfo & message_info) override; + /// This function is currently not implemented. RCLCPP_PUBLIC void handle_loaned_message( diff --git a/rclcpp/include/rclcpp/get_message_type_support_handle.hpp b/rclcpp/include/rclcpp/get_message_type_support_handle.hpp new file mode 100644 index 0000000000..2f19528baf --- /dev/null +++ b/rclcpp/include/rclcpp/get_message_type_support_handle.hpp @@ -0,0 +1,90 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__GET_MESSAGE_TYPE_SUPPORT_HANDLE_HPP_ +#define RCLCPP__GET_MESSAGE_TYPE_SUPPORT_HANDLE_HPP_ + +#include + +#include "rosidl_runtime_cpp/traits.hpp" +#include "rosidl_runtime_cpp/message_type_support_decl.hpp" +#include "rosidl_typesupport_cpp/message_type_support.hpp" + +#include "rclcpp/type_adapter.hpp" + +/// Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types. + +namespace rclcpp +{ + +/// Specialization for when MessageT is an actual ROS message type. +template +constexpr +typename std::enable_if_t< + rosidl_generator_traits::is_message::value, + const rosidl_message_type_support_t & +> +get_message_type_support_handle() +{ + auto handle = rosidl_typesupport_cpp::get_message_type_support_handle(); + if (!handle) { + throw std::runtime_error("Type support handle unexpectedly nullptr"); + } + return *handle; +} + +/// Specialization for when MessageT is an adapted type using rclcpp::TypeAdapter. +template +constexpr +typename std::enable_if_t< + !rosidl_generator_traits::is_message::value && + rclcpp::TypeAdapter::is_specialized::value, + const rosidl_message_type_support_t & +> +get_message_type_support_handle() +{ + auto handle = rosidl_typesupport_cpp::get_message_type_support_handle< + typename TypeAdapter::ros_message_type + >(); + if (!handle) { + throw std::runtime_error("Type support handle unexpectedly nullptr"); + } + return *handle; +} + +/// Specialization for when MessageT is not a ROS message nor an adapted type. +/** + * This specialization is a pass through runtime check, which allows a better + * static_assert to catch this issue further down the line. + * This should never get to be called in practice, and is purely defensive. + */ +template< + typename AdaptedType +> +constexpr +typename std::enable_if_t< + !rosidl_generator_traits::is_message::value && + !TypeAdapter::is_specialized::value, + const rosidl_message_type_support_t & +> +get_message_type_support_handle() +{ + throw std::runtime_error( + "this specialization of rclcpp::get_message_type_support_handle() " + "should never be called"); +} + +} // namespace rclcpp + +#endif // RCLCPP__GET_MESSAGE_TYPE_SUPPORT_HANDLE_HPP_ diff --git a/rclcpp/include/rclcpp/is_ros_compatible_type.hpp b/rclcpp/include/rclcpp/is_ros_compatible_type.hpp new file mode 100644 index 0000000000..97c6ad6a27 --- /dev/null +++ b/rclcpp/include/rclcpp/is_ros_compatible_type.hpp @@ -0,0 +1,35 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__IS_ROS_COMPATIBLE_TYPE_HPP_ +#define RCLCPP__IS_ROS_COMPATIBLE_TYPE_HPP_ + +#include "rosidl_runtime_cpp/traits.hpp" + +#include "rclcpp/type_adapter.hpp" + +namespace rclcpp +{ + +template +struct is_ros_compatible_type +{ + static constexpr bool value = + rosidl_generator_traits::is_message::value || + rclcpp::TypeAdapter::is_specialized::value; +}; + +} // namespace rclcpp + +#endif // RCLCPP__IS_ROS_COMPATIBLE_TYPE_HPP_ diff --git a/rclcpp/include/rclcpp/loaned_message.hpp b/rclcpp/include/rclcpp/loaned_message.hpp index 1d7b0d5e4f..4dd396e033 100644 --- a/rclcpp/include/rclcpp/loaned_message.hpp +++ b/rclcpp/include/rclcpp/loaned_message.hpp @@ -103,6 +103,9 @@ class LoanedMessage * \param[in] allocator Allocator instance in case middleware can not allocate messages * \throws anything rclcpp::exceptions::throw_from_rcl_error can throw. */ + [[ + deprecated("used the LoanedMessage constructor that does not use a shared_ptr to the allocator") + ]] LoanedMessage( const rclcpp::PublisherBase * pub, std::shared_ptr> allocator) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index b2156a0068..2e44c6c60b 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -206,13 +206,8 @@ class Node : public std::enable_shared_from_this typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, - typename CallbackMessageT = - typename rclcpp::subscription_traits::has_message_type::type, - typename SubscriptionT = rclcpp::Subscription, - typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, - AllocatorT - > + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType > std::shared_ptr create_subscription( diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 48e2430541..df6039cf49 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -86,7 +86,6 @@ template< typename MessageT, typename CallbackT, typename AllocatorT, - typename CallbackMessageT, typename SubscriptionT, typename MessageMemoryStrategyT> std::shared_ptr diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index f3fbc34faa..c7f77420c4 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -15,9 +15,6 @@ #ifndef RCLCPP__PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_HPP_ -#include -#include - #include #include #include @@ -27,16 +24,21 @@ #include "rcl/error_handling.h" #include "rcl/publisher.h" +#include "rmw/error_handling.h" +#include "rmw/rmw.h" #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" +#include "rclcpp/get_message_type_support_handle.hpp" +#include "rclcpp/is_ros_compatible_type.hpp" #include "rclcpp/loaned_message.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/publisher_base.hpp" #include "rclcpp/publisher_options.hpp" +#include "rclcpp/type_adapter.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -49,15 +51,62 @@ template class LoanedMessage; /// A publisher publishes messages of any type to a topic. +/** + * MessageT must be a: + * + * - ROS message type with its own type support (e.g. std_msgs::msgs::String), or a + * - rclcpp::TypeAdapter + * (e.g. rclcpp::TypeAdapter type + * (e.g. TypeAdapter), PublishedType will + * be the custom type, and ROSMessageType will be the ros message type. + * + * This is achieved because of the "identity specialization" for TypeAdapter, + * which returns itself if it is already a TypeAdapter, and the default + * specialization which allows ROSMessageType to be void. + * \sa rclcpp::TypeAdapter for more details. + */ template> class Publisher : public PublisherBase { public: - using MessageAllocatorTraits = allocator::AllocRebind; - using MessageAllocator = typename MessageAllocatorTraits::allocator_type; - using MessageDeleter = allocator::Deleter; - using MessageUniquePtr = std::unique_ptr; - using MessageSharedPtr = std::shared_ptr; + static_assert( + rclcpp::is_ros_compatible_type::value, + "given message type is not compatible with ROS and cannot be used with a Publisher"); + + /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + /// MessageT::ros_message_type if MessageT is a TypeAdapter, otherwise just MessageT. + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + using PublishedTypeAllocatorTraits = allocator::AllocRebind; + using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; + using PublishedTypeDeleter = allocator::Deleter; + + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + using MessageAllocatorTraits + [[deprecated("use PublishedTypeAllocatorTraits")]] = + PublishedTypeAllocatorTraits; + using MessageAllocator + [[deprecated("use PublishedTypeAllocator")]] = + PublishedTypeAllocator; + using MessageDeleter + [[deprecated("use PublishedTypeDeleter")]] = + PublishedTypeDeleter; + using MessageUniquePtr + [[deprecated("use std::unique_ptr")]] = + std::unique_ptr; + using MessageSharedPtr + [[deprecated("use std::shared_ptr")]] = + std::shared_ptr; RCLCPP_SMART_PTR_DEFINITIONS(Publisher) @@ -80,12 +129,14 @@ class Publisher : public PublisherBase : PublisherBase( node_base, topic, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::get_message_type_support_handle(), options.template to_rcl_publisher_options(qos)), options_(options), - message_allocator_(new MessageAllocator(*options.get_allocator().get())) + published_type_allocator_(*options.get_allocator()), + ros_message_type_allocator_(*options.get_allocator()) { - allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + allocator::set_allocator_for_deleter(&published_type_deleter_, &published_type_allocator_); + allocator::set_allocator_for_deleter(&ros_message_type_deleter_, &ros_message_type_allocator_); if (options_.event_callbacks.deadline_callback) { this->add_event_handler( @@ -170,21 +221,33 @@ class Publisher : public PublisherBase * allocator. * \sa rclcpp::LoanedMessage for details of the LoanedMessage class. * - * \return LoanedMessage containing memory for a ROS message of type MessageT + * \return LoanedMessage containing memory for a ROS message of type ROSMessageType */ - rclcpp::LoanedMessage + rclcpp::LoanedMessage borrow_loaned_message() { - return rclcpp::LoanedMessage(this, this->get_allocator()); + return rclcpp::LoanedMessage( + *this, + this->get_ros_message_type_allocator()); } - /// Send a message to the topic for this publisher. + /// Publish a message on the topic. /** - * This function is templated on the input message type, MessageT. - * \param[in] msg A shared pointer to the message to send. + * This signature is enabled if the element_type of the std::unique_ptr is + * a ROS message type, as opposed to the custom_type of a TypeAdapter, and + * that type matches the type given when creating the publisher. + * + * This signature allows the user to give ownership of the message to rclcpp, + * allowing for more efficient intra-process communication optimizations. + * + * \param[in] msg A unique pointer to the message to send. */ - virtual void - publish(std::unique_ptr msg) + template + typename std::enable_if_t< + rosidl_generator_traits::is_message::value && + std::is_same::value + > + publish(std::unique_ptr msg) { if (!intra_process_is_enabled_) { this->do_inter_process_publish(*msg); @@ -207,8 +270,24 @@ class Publisher : public PublisherBase } } - virtual void - publish(const MessageT & msg) + /// Publish a message on the topic. + /** + * This signature is enabled if the object being published is + * a ROS message type, as opposed to the custom_type of a TypeAdapter, and + * that type matches the type given when creating the publisher. + * + * This signature allows the user to give a reference to a message, which is + * copied onto the heap without modification so that a copy can be owned by + * rclcpp and ownership of the copy can be moved later if needed. + * + * \param[in] msg A const reference to the message to send. + */ + template + typename std::enable_if_t< + rosidl_generator_traits::is_message::value && + std::is_same::value + > + publish(const T & msg) { // Avoid allocating when not using intra process. if (!intra_process_is_enabled_) { @@ -218,12 +297,77 @@ class Publisher : public PublisherBase // Otherwise we have to allocate memory in a unique_ptr and pass it along. // As the message is not const, a copy should be made. // A shared_ptr could also be constructed here. - auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1); - MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg); - MessageUniquePtr unique_msg(ptr, message_deleter_); + auto unique_msg = this->duplicate_ros_message_as_unique_ptr(msg); this->publish(std::move(unique_msg)); } + /// Publish a message on the topic. + /** + * This signature is enabled if this class was created with a TypeAdapter and + * the element_type of the std::unique_ptr matches the custom_type for the + * TypeAdapter used with this class. + * + * This signature allows the user to give ownership of the message to rclcpp, + * allowing for more efficient intra-process communication optimizations. + * + * \param[in] msg A unique pointer to the message to send. + */ + template + typename std::enable_if_t< + rclcpp::TypeAdapter::is_specialized::value && + std::is_same::value + > + publish(std::unique_ptr msg) + { + // TODO(wjwwood): later update this to give the unique_ptr to the intra + // process manager and let it decide if it needs to be converted or not. + // For now, convert it unconditionally and pass it the ROSMessageType + // publish function specialization. + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *unique_ros_msg); + this->publish(std::move(unique_ros_msg)); + } + + /// Publish a message on the topic. + /** + * This signature is enabled if this class was created with a TypeAdapter and + * the given type matches the custom_type of the TypeAdapter. + * + * This signature allows the user to give a reference to a message, which is + * copied onto the heap without modification so that a copy can be owned by + * rclcpp and ownership of the copy can be moved later if needed. + * + * \param[in] msg A const reference to the message to send. + */ + template + typename std::enable_if_t< + rclcpp::TypeAdapter::is_specialized::value && + std::is_same::value + > + publish(const T & msg) + { + // TODO(wjwwood): later update this to give the unique_ptr to the intra + // process manager and let it decide if it needs to be converted or not. + // For now, convert it unconditionally and pass it the ROSMessageType + // publish function specialization. + + // Avoid allocating when not using intra process. + if (!intra_process_is_enabled_) { + // Convert to the ROS message equivalent and publish it. + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message(msg, ros_msg); + // In this case we're not using intra process. + return this->do_inter_process_publish(ros_msg); + } + // Otherwise we have to allocate memory in a unique_ptr, convert it, + // and pass it along. + // As the message is not const, a copy should be made. + // A shared_ptr could also be constructed here. + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter::convert_to_ros_message(msg, *unique_ros_msg); + this->publish(std::move(unique_ros_msg)); + } + void publish(const rcl_serialized_message_t & serialized_msg) { @@ -245,7 +389,7 @@ class Publisher : public PublisherBase * \param loaned_msg The LoanedMessage instance to be published. */ void - publish(rclcpp::LoanedMessage && loaned_msg) + publish(rclcpp::LoanedMessage && loaned_msg) { if (!loaned_msg.is_valid()) { throw std::runtime_error("loaned message is not valid"); @@ -271,15 +415,28 @@ class Publisher : public PublisherBase } } - std::shared_ptr + [[deprecated("use get_published_type_allocator() or get_ros_message_type_allocator() instead")]] + std::shared_ptr get_allocator() const { - return message_allocator_; + return std::make_shared(published_type_allocator_); + } + + PublishedTypeAllocator + get_published_type_allocator() const + { + return published_type_allocator_; + } + + ROSMessageTypeAllocator + get_ros_message_type_allocator() const + { + return ros_message_type_allocator_; } protected: void - do_inter_process_publish(const MessageT & msg) + do_inter_process_publish(const ROSMessageType & msg) { TRACEPOINT( rclcpp_publish, @@ -316,7 +473,8 @@ class Publisher : public PublisherBase } void - do_loaned_message_publish(std::unique_ptr> msg) + do_loaned_message_publish( + std::unique_ptr> msg) { auto status = rcl_publish_loaned_message(publisher_handle_.get(), msg.get(), nullptr); @@ -336,7 +494,7 @@ class Publisher : public PublisherBase } void - do_intra_process_publish(std::unique_ptr msg) + do_intra_process_publish(std::unique_ptr msg) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -347,14 +505,15 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), - message_allocator_); + ros_message_type_allocator_); } - std::shared_ptr - do_intra_process_publish_and_return_shared(std::unique_ptr msg) + std::shared_ptr + do_intra_process_publish_and_return_shared( + std::unique_ptr msg) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -365,10 +524,29 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_shared( + return ipm->template do_intra_process_publish_and_return_shared( intra_process_publisher_id_, std::move(msg), - message_allocator_); + ros_message_type_allocator_); + } + + /// Return a new unique_ptr using the ROSMessageType of the publisher. + std::unique_ptr + create_ros_message_unique_ptr() + { + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr); + return std::unique_ptr(ptr, ros_message_type_deleter_); + } + + /// Duplicate a given ros message as a unique_ptr. + std::unique_ptr + duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg) + { + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr, msg); + return std::unique_ptr(ptr, ros_message_type_deleter_); } /// Copy of original options passed during construction. @@ -378,9 +556,10 @@ class Publisher : public PublisherBase */ const rclcpp::PublisherOptionsWithAllocator options_; - std::shared_ptr message_allocator_; - - MessageDeleter message_deleter_; + PublishedTypeAllocator published_type_allocator_; + PublishedTypeDeleter published_type_deleter_; + ROSMessageTypeAllocator ros_message_type_allocator_; + ROSMessageTypeDeleter ros_message_type_deleter_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 4b90bf3561..b8f68dc77e 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -47,7 +47,7 @@ class RCLCPP_PUBLIC_TYPE SerializedMessage * \param[in] initial_capacity The amount of memory to be allocated. * \param[in] allocator The allocator to be used for the initialization. */ - SerializedMessage( + explicit SerializedMessage( size_t initial_capacity, const rcl_allocator_t & allocator = rcl_get_default_allocator()); diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index fe98dc5796..dd73bffa46 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -60,10 +60,16 @@ class NodeTopicsInterface; /// Subscription implementation, templated on the type of message this subscription receives. template< - typename CallbackMessageT, + typename MessageT, typename AllocatorT = std::allocator, + /// MessageT::custom_type if MessageT is a TypeAdapter, + /// otherwise just MessageT. + typename SubscribedT = typename rclcpp::TypeAdapter::custom_type, + /// MessageT::ros_message_type if MessageT is a TypeAdapter, + /// otherwise just MessageT. + typename ROSMessageT = typename rclcpp::TypeAdapter::ros_message_type, typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, + ROSMessageT, AllocatorT >> class Subscription : public SubscriptionBase @@ -71,14 +77,36 @@ class Subscription : public SubscriptionBase friend class rclcpp::node_interfaces::NodeTopicsInterface; public: - using MessageAllocatorTraits = allocator::AllocRebind; - using MessageAllocator = typename MessageAllocatorTraits::allocator_type; - using MessageDeleter = allocator::Deleter; - using ConstMessageSharedPtr = std::shared_ptr; - using MessageUniquePtr = std::unique_ptr; + // Redeclare these here to use outside of the class. + using SubscribedType = SubscribedT; + using ROSMessageType = ROSMessageT; + using MessageMemoryStrategyType = MessageMemoryStrategyT; + + using SubscribedTypeAllocatorTraits = allocator::AllocRebind; + using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; + using SubscribedTypeDeleter = allocator::Deleter; + + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] = + ROSMessageTypeAllocatorTraits; + using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] = + ROSMessageTypeAllocator; + using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] = + ROSMessageTypeDeleter; + + using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr; + using MessageUniquePtr + [[deprecated("use std::unique_ptr instead")]] = + std::unique_ptr; + +private: using SubscriptionTopicStatisticsSharedPtr = - std::shared_ptr>; + std::shared_ptr>; +public: RCLCPP_SMART_PTR_DEFINITIONS(Subscription) /// Default constructor. @@ -104,7 +132,7 @@ class Subscription : public SubscriptionBase const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rclcpp::QoS & qos, - AnySubscriptionCallback callback, + AnySubscriptionCallback callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) @@ -112,8 +140,8 @@ class Subscription : public SubscriptionBase node_base, type_support_handle, topic_name, - options.template to_rcl_subscription_options(qos), - rclcpp::subscription_traits::is_serialized_subscription_argument::value), + options.template to_rcl_subscription_options(qos), + callback.is_serialized_message_callback()), any_callback_(callback), options_(options), message_memory_strategy_(message_memory_strategy) @@ -241,11 +269,34 @@ class Subscription : public SubscriptionBase * \throws any rcl errors from rcl_take, \sa rclcpp::exceptions::throw_from_rcl_error() */ bool - take(CallbackMessageT & message_out, rclcpp::MessageInfo & message_info_out) + take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out) { return this->take_type_erased(static_cast(&message_out), message_info_out); } + /// Take the next message from the inter-process subscription. + /** + * This verison takes a SubscribedType which is different frmo the + * ROSMessageType when a rclcpp::TypeAdapter is in used. + * + * \sa take(ROSMessageType &, rclcpp::MessageInfo &) + */ + template + std::enable_if_t< + !rosidl_generator_traits::is_message::value && + std::is_same_v, + bool + > + take(TakeT & message_out, rclcpp::MessageInfo & message_info_out) + { + ROSMessageType local_message; + bool taken = this->take_type_erased(static_cast(&local_message), message_info_out); + if (taken) { + rclcpp::TypeAdapter::convert_to_custom(local_message, message_out); + } + return taken; + } + std::shared_ptr create_message() override { @@ -272,7 +323,7 @@ class Subscription : public SubscriptionBase // we should ignore this copy of the message. return; } - auto typed_message = std::static_pointer_cast(message); + auto typed_message = std::static_pointer_cast(message); std::chrono::time_point now; if (subscription_topic_statistics_) { @@ -290,15 +341,24 @@ class Subscription : public SubscriptionBase } } + void + handle_serialized_message( + const std::shared_ptr & serialized_message, + const rclcpp::MessageInfo & message_info) override + { + // TODO(wjwwood): enable topic statistics for serialized messages + any_callback_.dispatch(serialized_message, message_info); + } + void handle_loaned_message( void * loaned_message, const rclcpp::MessageInfo & message_info) override { - auto typed_message = static_cast(loaned_message); + auto typed_message = static_cast(loaned_message); // message is loaned, so we have to make sure that the deleter does not deallocate the message - auto sptr = std::shared_ptr( - typed_message, [](CallbackMessageT * msg) {(void) msg;}); + auto sptr = std::shared_ptr( + typed_message, [](ROSMessageType * msg) {(void) msg;}); any_callback_.dispatch(sptr, message_info); } @@ -309,7 +369,7 @@ class Subscription : public SubscriptionBase void return_message(std::shared_ptr & message) override { - auto typed_message = std::static_pointer_cast(message); + auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); } @@ -332,21 +392,24 @@ class Subscription : public SubscriptionBase private: RCLCPP_DISABLE_COPY(Subscription) - AnySubscriptionCallback any_callback_; + AnySubscriptionCallback any_callback_; /// Copy of original options passed during construction. /** * It is important to save a copy of this so that the rmw payload which it * may contain is kept alive for the duration of the subscription. */ const rclcpp::SubscriptionOptionsWithAllocator options_; - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; + /// Component which computes and publishes topic statistics for this subscriber SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr}; + using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< - CallbackMessageT, + ROSMessageType, AllocatorT, - typename MessageUniquePtr::deleter_type>; + ROSMessageTypeDeleter, + MessageT>; std::shared_ptr subscription_intra_process_; }; diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index b0642bb69d..7653292b5e 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -183,6 +183,13 @@ class SubscriptionBase : public std::enable_shared_from_this void handle_message(std::shared_ptr & message, const rclcpp::MessageInfo & message_info) = 0; + RCLCPP_PUBLIC + virtual + void + handle_serialized_message( + const std::shared_ptr & serialized_message, + const rclcpp::MessageInfo & message_info) = 0; + RCLCPP_PUBLIC virtual void diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index 7708f27df0..4da6c236bf 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -25,6 +25,7 @@ #include "rosidl_typesupport_cpp/message_type_support.hpp" #include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/get_message_type_support_handle.hpp" #include "rclcpp/intra_process_buffer_type.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/qos.hpp" @@ -74,26 +75,23 @@ template< typename MessageT, typename CallbackT, typename AllocatorT, - typename CallbackMessageT = - typename rclcpp::subscription_traits::has_message_type::type, - typename SubscriptionT = rclcpp::Subscription, - typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, - AllocatorT - >> + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType, + typename ROSMessageType = typename SubscriptionT::ROSMessageType +> SubscriptionFactory create_subscription_factory( CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, - std::shared_ptr> + std::shared_ptr> subscription_topic_stats = nullptr ) { auto allocator = options.get_allocator(); using rclcpp::AnySubscriptionCallback; - AnySubscriptionCallback any_subscription_callback(*allocator); + AnySubscriptionCallback any_subscription_callback(*allocator); any_subscription_callback.set(std::forward(callback)); SubscriptionFactory factory { @@ -107,9 +105,9 @@ create_subscription_factory( using rclcpp::Subscription; using rclcpp::SubscriptionBase; - auto sub = Subscription::make_shared( + auto sub = Subscription::make_shared( node_base, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::get_message_type_support_handle(), topic_name, qos, any_subscription_callback, diff --git a/rclcpp/include/rclcpp/type_adapter.hpp b/rclcpp/include/rclcpp/type_adapter.hpp new file mode 100644 index 0000000000..0b779c8062 --- /dev/null +++ b/rclcpp/include/rclcpp/type_adapter.hpp @@ -0,0 +1,197 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__TYPE_ADAPTER_HPP_ +#define RCLCPP__TYPE_ADAPTER_HPP_ + +#include + +namespace rclcpp +{ + +/// Template structure used to adapt custom, user-defined types to ROS types. +/** + * Adapting a custom, user-defined type to a ROS type allows that custom type + * to be used when publishing and subscribing in ROS. + * + * In order to adapt a custom type to a ROS type, the user must create a + * template specialization of this structure for the custom type. + * In that specialization they must: + * + * - change `is_specialized` to `std::true_type`, + * - specify the custom type with `using custom_type = ...`, + * - specify the ROS type with `using ros_message_type = ...`, + * - provide static convert functions with the signatures: + * - static void convert_to_ros(const custom_type &, ros_message_type &) + * - static void convert_to_custom(const ros_message_type &, custom_type &) + * + * The convert functions must convert from one type to the other. + * + * For example, here is a theoretical example for adapting `std::string` to the + * `std_msgs::msg::String` ROS message type: + * + * template<> + * struct rclcpp::TypeAdapter + * { + * using is_specialized = std::true_type; + * using custom_type = std::string; + * using ros_message_type = std_msgs::msg::String; + * + * static + * void + * convert_to_ros_message( + * const custom_type & source, + * ros_message_type & destination) + * { + * destination.data = source; + * } + * + * static + * void + * convert_to_custom( + * const ros_message_type & source, + * custom_type & destination) + * { + * destination = source.data; + * } + * }; + * + * The adapter can then be used when creating a publisher or subscription, + * e.g.: + * + * using MyAdaptedType = TypeAdapter; + * auto pub = node->create_publisher("topic", 10); + * auto sub = node->create_subscription( + * "topic", + * 10, + * [](const std::string & msg) {...}); + * + * You can also be more declarative by using the adapt_type::as metafunctions, + * which are a bit less ambiguous to read: + * + * using AdaptedType = rclcpp::adapt_type::as; + * auto pub = node->create_publisher(...); + * + * If you wish, you may associate a custom type with a single ROS message type, + * allowing you to be a bit more brief when creating entities, e.g.: + * + * // First you must declare the association, this is similar to how you + * // would avoid using the namespace in C++ by doing `using std::vector;`. + * RCLCPP_USING_CUSTOM_TYPE_AS_ROS_MESSAGE_TYPE(std::string, std_msgs::msg::String); + * + * // Then you can create things with just the custom type, and the ROS + * // message type is implied based on the previous statement. + * auto pub = node->create_publisher(...); + */ +template +struct TypeAdapter +{ + using is_specialized = std::false_type; + using custom_type = CustomType; + // In this case, the CustomType is the only thing given, or there is no specialization. + // Assign ros_message_type to CustomType for the former case. + using ros_message_type = CustomType; +}; + +/// Helper template to determine if a type is a TypeAdapter, false specialization. +template +struct is_type_adapter : std::false_type {}; + +/// Helper template to determine if a type is a TypeAdapter, true specialization. +template +struct is_type_adapter>: std::true_type {}; + +/// Identity specialization for TypeAdapter. +template +struct TypeAdapter::value>>: T {}; + +namespace detail +{ + +template +struct assert_type_pair_is_specialized_type_adapter +{ + using type_adapter = TypeAdapter; + static_assert( + type_adapter::is_specialized::value, + "No type adapter for this custom type/ros message type pair"); +}; + +} // namespace detail + +/// Template metafunction that can make the type being adapted explicit. +template +struct adapt_type +{ + template + using as = typename ::rclcpp::detail::assert_type_pair_is_specialized_type_adapter< + CustomType, + ROSMessageType + >::type_adapter; +}; + +/// Implicit type adapter used as a short hand way to create something with just the custom type. +/** + * This is used when creating a publisher or subscription using just the custom + * type in conjunction with RCLCPP_USING_CUSTOM_TYPE_AS_ROS_MESSAGE_TYPE(). + * For example: + * + * #include "type_adapter_for_std_string_to_std_msgs_String.hpp" + * + * RCLCPP_USING_CUSTOM_TYPE_AS_ROS_MESSAGE_TYPE(std::string, std_msgs::msg::String); + * + * int main(...) { + * // ... + * auto pub = node->create_publisher(...); + * } + * + * \sa TypeAdapter for more examples. + */ +template +struct ImplicitTypeAdapter +{ + using is_specialized = std::false_type; +}; + +/// Specialization of TypeAdapter for ImplicitTypeAdapter. +/** + * This allows for things like this: + * + * RCLCPP_USING_CUSTOM_TYPE_AS_ROS_MESSAGE_TYPE(std::string, std_msgs::msg::String); + * auto pub = node->create_publisher("topic", 10); + * + */ +template +struct TypeAdapter::is_specialized::value>> + : ImplicitTypeAdapter +{}; + +/// Assigns the custom type implicitly to the given custom type/ros message type pair. +/** + * \sa TypeAdapter + * \sa ImplicitTypeAdapter + */ +#define RCLCPP_USING_CUSTOM_TYPE_AS_ROS_MESSAGE_TYPE(CustomType, ROSMessageType) \ + template<> \ + struct ::rclcpp::ImplicitTypeAdapter \ + : public ::rclcpp::TypeAdapter \ + { \ + static_assert( \ + is_specialized::value, \ + "Cannot use custom type as ros type when there is no TypeAdapter for that pair"); \ + }; + +} // namespace rclcpp + +#endif // RCLCPP__TYPE_ADAPTER_HPP_ diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 20876668a4..ea08204798 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -589,8 +589,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);}, [&]() { - auto void_serialized_msg = std::static_pointer_cast(serialized_msg); - subscription->handle_message(void_serialized_msg, message_info); + subscription->handle_serialized_message(serialized_msg, message_info); }); subscription->return_serialized_message(serialized_msg); } else if (subscription->can_loan_messages()) { diff --git a/rclcpp/src/rclcpp/generic_subscription.cpp b/rclcpp/src/rclcpp/generic_subscription.cpp index d72d21a0f9..cc50955773 100644 --- a/rclcpp/src/rclcpp/generic_subscription.cpp +++ b/rclcpp/src/rclcpp/generic_subscription.cpp @@ -36,11 +36,19 @@ std::shared_ptr GenericSubscription::create_serialize } void GenericSubscription::handle_message( - std::shared_ptr & message, const rclcpp::MessageInfo & message_info) + std::shared_ptr &, + const rclcpp::MessageInfo &) { - (void) message_info; - auto typed_message = std::static_pointer_cast(message); - callback_(typed_message); + throw rclcpp::exceptions::UnimplementedError( + "handle_message is not implemented for GenericSubscription"); +} + +void +GenericSubscription::handle_serialized_message( + const std::shared_ptr & message, + const rclcpp::MessageInfo &) +{ + callback_(message); } void GenericSubscription::handle_loaned_message( diff --git a/rclcpp/test/msg/String.msg b/rclcpp/test/msg/String.msg new file mode 100644 index 0000000000..44e5aaf86b --- /dev/null +++ b/rclcpp/test/msg/String.msg @@ -0,0 +1 @@ +string data \ No newline at end of file diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index f5206c1d05..610c2d68e0 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -7,6 +7,7 @@ add_definitions(-DTEST_RESOURCES_DIRECTORY="${TEST_RESOURCES_DIRECTORY}") rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs ../msg/Header.msg ../msg/MessageWithHeader.msg + ../msg/String.msg DEPENDENCIES builtin_interfaces LIBRARY_NAME ${PROJECT_NAME} SKIP_INSTALL @@ -355,6 +356,44 @@ if(TARGET test_publisher) ) target_link_libraries(test_publisher ${PROJECT_NAME} mimick) endif() + +set(append_library_dirs "${CMAKE_CURRENT_BINARY_DIR}") +if(WIN32) + set(append_library_dirs "${append_library_dirs}/$") +endif() + +ament_add_gtest(test_publisher_with_type_adapter test_publisher_with_type_adapter.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" +) +if(TARGET test_publisher_with_type_adapter) + ament_target_dependencies(test_publisher_with_type_adapter + "rcutils" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + rosidl_target_interfaces(test_publisher_with_type_adapter ${PROJECT_NAME}_test_msgs "rosidl_typesupport_cpp") + target_link_libraries(test_publisher_with_type_adapter ${PROJECT_NAME} mimick) +endif() + +ament_add_gtest(test_subscription_with_type_adapter test_subscription_with_type_adapter.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" +) +if(TARGET test_subscription_with_type_adapter) + ament_target_dependencies(test_subscription_with_type_adapter + "rcutils" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + rosidl_target_interfaces(test_subscription_with_type_adapter ${PROJECT_NAME}_test_msgs "rosidl_typesupport_cpp") + target_link_libraries(test_subscription_with_type_adapter ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_publisher_subscription_count_api test_publisher_subscription_count_api.cpp) if(TARGET test_publisher_subscription_count_api) ament_target_dependencies(test_publisher_subscription_count_api @@ -507,11 +546,6 @@ if(TARGET test_find_weak_nodes) target_link_libraries(test_find_weak_nodes ${PROJECT_NAME}) endif() -set(append_library_dirs "${CMAKE_CURRENT_BINARY_DIR}") -if(WIN32) - set(append_library_dirs "${append_library_dirs}/$") -endif() - ament_add_gtest(test_externally_defined_services test_externally_defined_services.cpp) ament_target_dependencies(test_externally_defined_services "rcl" diff --git a/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp b/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp index 16f1d92b86..bb59c83bb2 100644 --- a/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp +++ b/rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp @@ -71,6 +71,8 @@ class TestSubscription : public rclcpp::SubscriptionBase void handle_message(std::shared_ptr &, const rclcpp::MessageInfo &) override {} void handle_loaned_message(void *, const rclcpp::MessageInfo &) override {} + void handle_serialized_message( + const std::shared_ptr &, const rclcpp::MessageInfo &) override {} void return_message(std::shared_ptr &) override {} void return_serialized_message(std::shared_ptr &) override {} }; diff --git a/rclcpp/test/rclcpp/test_any_subscription_callback.cpp b/rclcpp/test/rclcpp/test_any_subscription_callback.cpp index b8d7fd9ba3..8e44eb71bf 100644 --- a/rclcpp/test/rclcpp/test_any_subscription_callback.cpp +++ b/rclcpp/test/rclcpp/test_any_subscription_callback.cpp @@ -67,7 +67,7 @@ void construct_with_null_allocator() TEST(AnySubscriptionCallback, null_allocator) { EXPECT_THROW( construct_with_null_allocator(), - std::runtime_error); + std::invalid_argument); } TEST_F(TestAnySubscriptionCallback, construct_destruct) { @@ -95,44 +95,69 @@ TEST_F(TestAnySubscriptionCallback, unset_dispatch_throw) { // Parameterized test to test across all callback types and dispatch types. // +// Type adapter to be used in tests. +struct MyEmpty {}; + +template<> +struct rclcpp::TypeAdapter +{ + using is_specialized = std::true_type; + using custom_type = MyEmpty; + using ros_message_type = test_msgs::msg::Empty; + + static + void + convert_to_ros_message(const custom_type &, ros_message_type &) + {} + + static + void + convert_to_custom(const ros_message_type &, custom_type &) + {} +}; + +using MyTA = rclcpp::TypeAdapter; + +template class InstanceContextImpl { public: InstanceContextImpl() = default; virtual ~InstanceContextImpl() = default; - explicit InstanceContextImpl(rclcpp::AnySubscriptionCallback asc) + explicit InstanceContextImpl(rclcpp::AnySubscriptionCallback asc) : any_subscription_callback_(asc) {} virtual - rclcpp::AnySubscriptionCallback + rclcpp::AnySubscriptionCallback get_any_subscription_callback_to_test() const { return any_subscription_callback_; } protected: - rclcpp::AnySubscriptionCallback any_subscription_callback_; + rclcpp::AnySubscriptionCallback any_subscription_callback_; }; +template class InstanceContext { public: - InstanceContext(const std::string & name, std::shared_ptr impl) + InstanceContext(const std::string & name, std::shared_ptr> impl) : name(name), impl_(impl) {} InstanceContext( const std::string & name, - rclcpp::AnySubscriptionCallback asc) - : name(name), impl_(std::make_shared(asc)) + rclcpp::AnySubscriptionCallback asc) + : name(name), impl_(std::make_shared>(asc)) {} InstanceContext(const InstanceContext & other) : InstanceContext(other.name, other.impl_) {} - rclcpp::AnySubscriptionCallback + rclcpp::AnySubscriptionCallback get_any_subscription_callback_to_test() const { return impl_->get_any_subscription_callback_to_test(); @@ -141,12 +166,17 @@ class InstanceContext std::string name; protected: - std::shared_ptr impl_; + std::shared_ptr> impl_; }; class DispatchTests : public TestAnySubscriptionCallback, - public ::testing::WithParamInterface + public ::testing::WithParamInterface> +{}; + +class DispatchTestsWithTA + : public TestAnySubscriptionCallback, + public ::testing::WithParamInterface> {}; auto @@ -155,57 +185,67 @@ format_parameter(const ::testing::TestParamInfo & info return info.param.name; } -// Testing dispatch with shared_ptr as input -TEST_P(DispatchTests, test_inter_shared_dispatch) { - auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); - any_subscription_callback_to_test.dispatch(msg_shared_ptr_, message_info_); +auto +format_parameter_with_ta(const ::testing::TestParamInfo & info) +{ + return info.param.name; } -// Testing dispatch with shared_ptr as input -TEST_P(DispatchTests, test_intra_shared_dispatch) { - auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); - any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); -} +#define PARAMETERIZED_TESTS(DispatchTests_name) \ + /* Testing dispatch with shared_ptr as input */ \ + TEST_P(DispatchTests_name, test_inter_shared_dispatch) { \ + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \ + any_subscription_callback_to_test.dispatch(msg_shared_ptr_, message_info_); \ + } \ + \ + /* Testing dispatch with shared_ptr as input */ \ + TEST_P(DispatchTests_name, test_intra_shared_dispatch) { \ + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \ + any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); \ + } \ + \ + /* Testing dispatch with unique_ptr as input */ \ + TEST_P(DispatchTests_name, test_intra_unique_dispatch) { \ + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \ + any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); \ + } -// Testing dispatch with unique_ptr as input -TEST_P(DispatchTests, test_intra_unique_dispatch) { - auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); - any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); -} +PARAMETERIZED_TESTS(DispatchTests) +PARAMETERIZED_TESTS(DispatchTestsWithTA) // Generic classes for testing callbacks using std::bind to class methods. -template -class BindContextImpl : public InstanceContextImpl +template +class BindContextImpl : public InstanceContextImpl { static constexpr size_t number_of_callback_args{sizeof...(CallbackArgs)}; public: - using InstanceContextImpl::InstanceContextImpl; + using InstanceContextImpl::InstanceContextImpl; virtual ~BindContextImpl() = default; void on_message(CallbackArgs ...) const {} - rclcpp::AnySubscriptionCallback + rclcpp::AnySubscriptionCallback get_any_subscription_callback_to_test() const override { if constexpr (number_of_callback_args == 1) { - return rclcpp::AnySubscriptionCallback().set( + return rclcpp::AnySubscriptionCallback().set( std::bind(&BindContextImpl::on_message, this, std::placeholders::_1) ); } else { - return rclcpp::AnySubscriptionCallback().set( + return rclcpp::AnySubscriptionCallback().set( std::bind(&BindContextImpl::on_message, this, std::placeholders::_1, std::placeholders::_2) ); } } }; -template -class BindContext : public InstanceContext +template +class BindContext : public InstanceContext { public: explicit BindContext(const std::string & name) - : InstanceContext(name, std::make_shared>()) + : InstanceContext(name, std::make_shared>()) {} }; @@ -232,13 +272,53 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::AnySubscriptionCallback().set( const_ref_w_info_free_func)}, // bind function - BindContext("bind_method"), - BindContext( + BindContext("bind_method"), + BindContext( "bind_method_with_info") ), format_parameter ); +void const_ref_ta_free_func(const MyEmpty &) {} +void const_ref_ta_w_info_free_func(const MyEmpty &, const rclcpp::MessageInfo &) {} + +INSTANTIATE_TEST_SUITE_P( + ConstRefTACallbackTests, + DispatchTestsWithTA, + ::testing::Values( + // lambda + InstanceContext{"lambda_ta", rclcpp::AnySubscriptionCallback().set( + [](const MyEmpty &) {})}, + InstanceContext{"lambda_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + [](const MyEmpty &, const rclcpp::MessageInfo &) {})}, + InstanceContext{"lambda", rclcpp::AnySubscriptionCallback().set( + [](const test_msgs::msg::Empty &) {})}, + InstanceContext{"lambda_with_info", + rclcpp::AnySubscriptionCallback().set( + [](const test_msgs::msg::Empty &, const rclcpp::MessageInfo &) {})}, + // free function + InstanceContext{"free_function_ta", rclcpp::AnySubscriptionCallback().set( + const_ref_ta_free_func)}, + InstanceContext{"free_function_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + const_ref_ta_w_info_free_func)}, + InstanceContext{"free_function", rclcpp::AnySubscriptionCallback().set( + const_ref_free_func)}, + InstanceContext{"free_function_with_info", + rclcpp::AnySubscriptionCallback().set( + const_ref_w_info_free_func)}, + // bind function + BindContext("bind_method_ta"), + BindContext( + "bind_method_ta_with_info"), + BindContext("bind_method"), + BindContext( + "bind_method_with_info") + ), + format_parameter_with_ta +); + // // Versions of `std::unique_ptr` // @@ -264,13 +344,56 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::AnySubscriptionCallback().set( unique_ptr_w_info_free_func)}, // bind function - BindContext>("bind_method"), - BindContext, const rclcpp::MessageInfo &>( - "bind_method_with_info") + BindContext>("bind_method"), + BindContext< + test_msgs::msg::Empty, + std::unique_ptr, + const rclcpp::MessageInfo & + >("bind_method_with_info") ), format_parameter ); +void unique_ptr_ta_free_func(std::unique_ptr) {} +void unique_ptr_ta_w_info_free_func(std::unique_ptr, const rclcpp::MessageInfo &) {} + +INSTANTIATE_TEST_SUITE_P( + UniquePtrCallbackTests, + DispatchTestsWithTA, + ::testing::Values( + // lambda + InstanceContext{"lambda_ta", rclcpp::AnySubscriptionCallback().set( + [](std::unique_ptr) {})}, + InstanceContext{"lambda_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + [](std::unique_ptr, const rclcpp::MessageInfo &) {})}, + InstanceContext{"lambda", rclcpp::AnySubscriptionCallback().set( + [](std::unique_ptr) {})}, + InstanceContext{"lambda_with_info", + rclcpp::AnySubscriptionCallback().set( + [](std::unique_ptr, const rclcpp::MessageInfo &) {})}, + // free function + InstanceContext{"free_function_ta", rclcpp::AnySubscriptionCallback().set( + unique_ptr_ta_free_func)}, + InstanceContext{"free_function_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + unique_ptr_ta_w_info_free_func)}, + InstanceContext{"free_function", rclcpp::AnySubscriptionCallback().set( + unique_ptr_free_func)}, + InstanceContext{"free_function_with_info", + rclcpp::AnySubscriptionCallback().set( + unique_ptr_w_info_free_func)}, + // bind function + BindContext>("bind_method_ta"), + BindContext, const rclcpp::MessageInfo &>( + "bind_method_ta_with_info"), + BindContext>("bind_method"), + BindContext, const rclcpp::MessageInfo &>( + "bind_method_with_info") + ), + format_parameter_with_ta +); + // // Versions of `std::shared_ptr` // @@ -296,13 +419,56 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::AnySubscriptionCallback().set( shared_const_ptr_w_info_free_func)}, // bind function - BindContext>("bind_method"), - BindContext, const rclcpp::MessageInfo &>( + BindContext>("bind_method"), + BindContext, + const rclcpp::MessageInfo &>( "bind_method_with_info") ), format_parameter ); +void shared_const_ptr_ta_free_func(std::shared_ptr) {} +void shared_const_ptr_ta_w_info_free_func( + std::shared_ptr, const rclcpp::MessageInfo &) +{} + +INSTANTIATE_TEST_SUITE_P( + SharedConstPtrCallbackTests, + DispatchTestsWithTA, + ::testing::Values( + // lambda + InstanceContext{"lambda_ta", rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr) {})}, + InstanceContext{"lambda_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr, const rclcpp::MessageInfo &) {})}, + InstanceContext{"lambda", rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr) {})}, + InstanceContext{"lambda_with_info", + rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr, const rclcpp::MessageInfo &) {})}, + // free function + InstanceContext{"free_function_ta", rclcpp::AnySubscriptionCallback().set( + shared_const_ptr_ta_free_func)}, + InstanceContext{"free_function_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + shared_const_ptr_ta_w_info_free_func)}, + InstanceContext{"free_function", rclcpp::AnySubscriptionCallback().set( + shared_const_ptr_free_func)}, + InstanceContext{"free_function_with_info", + rclcpp::AnySubscriptionCallback().set( + shared_const_ptr_w_info_free_func)}, + // bind function + BindContext>("bind_method_ta"), + BindContext, const rclcpp::MessageInfo &>( + "bind_method_ta_with_info"), + BindContext>("bind_method"), + BindContext, const rclcpp::MessageInfo &>( + "bind_method_with_info") + ), + format_parameter_with_ta +); + // // Versions of `const std::shared_ptr &` // @@ -328,13 +494,58 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::AnySubscriptionCallback().set( const_ref_shared_const_ptr_w_info_free_func)}, // bind function - BindContext &>("bind_method"), - BindContext &, const rclcpp::MessageInfo &>( + BindContext &>("bind_method"), + BindContext &, + const rclcpp::MessageInfo &>( "bind_method_with_info") ), format_parameter ); +void const_ref_shared_const_ptr_ta_free_func(const std::shared_ptr &) {} +void const_ref_shared_const_ptr_ta_w_info_free_func( + const std::shared_ptr &, const rclcpp::MessageInfo &) +{} + +INSTANTIATE_TEST_SUITE_P( + ConstRefSharedConstPtrCallbackTests, + DispatchTestsWithTA, + ::testing::Values( + // lambda + InstanceContext{"lambda_ta", rclcpp::AnySubscriptionCallback().set( + [](const std::shared_ptr &) {})}, + InstanceContext{"lambda_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + [](const std::shared_ptr &, const rclcpp::MessageInfo &) {})}, + InstanceContext{"lambda", rclcpp::AnySubscriptionCallback().set( + [](const std::shared_ptr &) {})}, + InstanceContext{"lambda_with_info", + rclcpp::AnySubscriptionCallback().set( + [](const std::shared_ptr &, const rclcpp::MessageInfo &) {})}, + // free function + InstanceContext{"free_function_ta", rclcpp::AnySubscriptionCallback().set( + const_ref_shared_const_ptr_ta_free_func)}, + InstanceContext{"free_function_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + const_ref_shared_const_ptr_ta_w_info_free_func)}, + InstanceContext{"free_function", rclcpp::AnySubscriptionCallback().set( + const_ref_shared_const_ptr_free_func)}, + InstanceContext{"free_function_with_info", + rclcpp::AnySubscriptionCallback().set( + const_ref_shared_const_ptr_w_info_free_func)}, + // bind function + BindContext &>("bind_method_ta"), + BindContext &, const rclcpp::MessageInfo &>( + "bind_method_ta_with_info"), + BindContext &>("bind_method"), + BindContext &, + const rclcpp::MessageInfo &>( + "bind_method_with_info") + ), + format_parameter_with_ta +); + // // Versions of `std::shared_ptr` // @@ -360,9 +571,52 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::AnySubscriptionCallback().set( shared_ptr_w_info_free_func)}, // bind function - BindContext>("bind_method"), - BindContext, const rclcpp::MessageInfo &>( + BindContext>("bind_method"), + BindContext, + const rclcpp::MessageInfo &>( "bind_method_with_info") ), format_parameter ); + +void shared_ptr_ta_free_func(std::shared_ptr) {} +void shared_ptr_ta_w_info_free_func( + std::shared_ptr, const rclcpp::MessageInfo &) +{} + +INSTANTIATE_TEST_SUITE_P( + SharedPtrCallbackTests, + DispatchTestsWithTA, + ::testing::Values( + // lambda + InstanceContext{"lambda_ta", rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr) {})}, + InstanceContext{"lambda_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr, const rclcpp::MessageInfo &) {})}, + InstanceContext{"lambda", rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr) {})}, + InstanceContext{"lambda_with_info", + rclcpp::AnySubscriptionCallback().set( + [](std::shared_ptr, const rclcpp::MessageInfo &) {})}, + // free function + InstanceContext{"free_function_ta", rclcpp::AnySubscriptionCallback().set( + shared_ptr_ta_free_func)}, + InstanceContext{"free_function_ta_with_info", + rclcpp::AnySubscriptionCallback().set( + shared_ptr_ta_w_info_free_func)}, + InstanceContext{"free_function", rclcpp::AnySubscriptionCallback().set( + shared_ptr_free_func)}, + InstanceContext{"free_function_with_info", + rclcpp::AnySubscriptionCallback().set( + shared_ptr_w_info_free_func)}, + // bind function + BindContext>("bind_method_ta"), + BindContext, const rclcpp::MessageInfo &>( + "bind_method_ta_with_info"), + BindContext>("bind_method"), + BindContext, const rclcpp::MessageInfo &>( + "bind_method_with_info") + ), + format_parameter_with_ta +); diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index ec7ccc7e5a..00350edeb0 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -136,7 +136,10 @@ namespace buffers { namespace mock { -template +template< + typename MessageT, + typename Alloc = std::allocator, + typename MessageDeleter = std::default_delete> class IntraProcessBuffer { public: @@ -220,12 +223,12 @@ template< typename MessageT, typename Alloc = std::allocator, typename Deleter = std::default_delete> -class SubscriptionIntraProcess : public SubscriptionIntraProcessBase +class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) + RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer) - SubscriptionIntraProcess() + SubscriptionIntraProcessBuffer() : take_shared_method(false) { buffer = std::make_unique>(); @@ -262,6 +265,25 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase typename rclcpp::experimental::buffers::mock::IntraProcessBuffer::UniquePtr buffer; }; +template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> +class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< + MessageT, + Alloc, + Deleter +> +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) + + SubscriptionIntraProcess() + : SubscriptionIntraProcessBuffer() + { + } +}; + } // namespace mock } // namespace experimental } // namespace rclcpp @@ -270,12 +292,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase #define RCLCPP__PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase #define IntraProcessBuffer mock::IntraProcessBuffer #define SubscriptionIntraProcessBase mock::SubscriptionIntraProcessBase +#define SubscriptionIntraProcessBuffer mock::SubscriptionIntraProcessBuffer #define SubscriptionIntraProcess mock::SubscriptionIntraProcess #include "../src/rclcpp/intra_process_manager.cpp" #undef Publisher @@ -307,7 +331,7 @@ void Publisher::publish(MessageUniquePtr msg) ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), - message_allocator_); + *message_allocator_); } } // namespace mock diff --git a/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp b/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp index 9bef3c2c0f..6d192ca86b 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp @@ -196,7 +196,6 @@ do_custom_allocator_test( test_msgs::msg::Empty, decltype(callback), SubscriptionAllocatorT, - CallbackMessageT, rclcpp::Subscription, rclcpp::message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, diff --git a/rclcpp/test/rclcpp/test_loaned_message.cpp b/rclcpp/test/rclcpp/test_loaned_message.cpp index c82ff5fc7c..01284a27c1 100644 --- a/rclcpp/test/rclcpp/test_loaned_message.cpp +++ b/rclcpp/test/rclcpp/test_loaned_message.cpp @@ -43,7 +43,42 @@ TEST_F(TestLoanedMessage, initialize) { auto node = std::make_shared("loaned_message_test_node"); auto pub = node->create_publisher("loaned_message_test_topic", 1); - auto loaned_msg = rclcpp::LoanedMessage(pub.get(), pub->get_allocator()); +// suppress deprecated function warning +#if !defined(_WIN32) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#else // !defined(_WIN32) +# pragma warning(push) +# pragma warning(disable: 4996) +#endif + + auto pub_allocator = pub->get_allocator(); + +// remove warning suppression +#if !defined(_WIN32) +# pragma GCC diagnostic pop +#else // !defined(_WIN32) +# pragma warning(pop) +#endif + +// suppress deprecated function warning +#if !defined(_WIN32) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#else // !defined(_WIN32) +# pragma warning(push) +# pragma warning(disable: 4996) +#endif + + auto loaned_msg = rclcpp::LoanedMessage(pub.get(), pub_allocator); + +// remove warning suppression +#if !defined(_WIN32) +# pragma GCC diagnostic pop +#else // !defined(_WIN32) +# pragma warning(pop) +#endif + ASSERT_TRUE(loaned_msg.is_valid()); loaned_msg.get().float32_value = 42.0f; ASSERT_EQ(42.0f, loaned_msg.get().float32_value); diff --git a/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp b/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp new file mode 100644 index 0000000000..2f5d540568 --- /dev/null +++ b/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp @@ -0,0 +1,335 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/loaned_message.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "../mocking_utils/patch.hpp" +#include "../utils/rclcpp_gtest_macros.hpp" + +#include "test_msgs/msg/empty.hpp" +#include "rclcpp/msg/string.hpp" + + +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + + +using namespace std::chrono_literals; + +static const int g_max_loops = 200; +static const std::chrono::milliseconds g_sleep_per_loop(10); + + +class TestPublisher : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + +protected: + void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) + { + node = std::make_shared("my_node", "/ns", node_options); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } +}; + +namespace rclcpp +{ + +template<> +struct TypeAdapter +{ + using is_specialized = std::true_type; + using custom_type = std::string; + using ros_message_type = rclcpp::msg::String; + + static void + convert_to_ros_message( + const custom_type & source, + ros_message_type & destination) + { + destination.data = source; + } + + static void + convert_to_custom( + const ros_message_type & source, + custom_type & destination) + { + destination = source.data; + } +}; + +// Throws in conversion +template<> +struct TypeAdapter +{ + using is_specialized = std::true_type; + using custom_type = int; + using ros_message_type = rclcpp::msg::String; + + static void + convert_to_ros_message( + const custom_type & source, + ros_message_type & destination) + { + (void) source; + (void) destination; + throw std::runtime_error("This should happen"); + } + + static void + convert_to_custom( + const ros_message_type & source, + custom_type & destination) + { + (void) source; + (void) destination; + } +}; + +} // namespace rclcpp + +/* + * Testing publisher creation signatures with a type adapter. + */ +TEST_F(TestPublisher, various_creation_signatures) { + for (auto is_intra_process : {true, false}) { + rclcpp::NodeOptions options; + options.use_intra_process_comms(is_intra_process); + initialize(options); + { + using StringTypeAdapter = rclcpp::TypeAdapter; + auto publisher = node->create_publisher("topic", 42); + (void)publisher; + } + { + using StringTypeAdapter = rclcpp::adapt_type::as; + auto publisher = node->create_publisher("topic", 42); + (void)publisher; + } + } +} + +/* + * Testing that conversion errors are passed up. + */ +TEST_F(TestPublisher, conversion_exception_is_passed_up) { + using BadStringTypeAdapter = rclcpp::TypeAdapter; + for (auto is_intra_process : {true, false}) { + rclcpp::NodeOptions options; + options.use_intra_process_comms(is_intra_process); + initialize(options); + auto pub = node->create_publisher("topic_name", 1); + EXPECT_THROW(pub->publish(1), std::runtime_error); + } +} + +/* + * Testing that publisher sends type adapted types and ROS message types with intra proccess communications. + */ +TEST_F( + CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION), + check_type_adapted_message_is_sent_and_received_intra_process) { + using StringTypeAdapter = rclcpp::TypeAdapter; + const std::string message_data = "Message Data"; + const std::string topic_name = "topic_name"; + bool is_received; + + auto callback = + [message_data, &is_received]( + const rclcpp::msg::String::SharedPtr msg, + const rclcpp::MessageInfo & message_info + ) -> void + { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + + auto node = rclcpp::Node::make_shared( + "test_intra_process", + rclcpp::NodeOptions().use_intra_process_comms(true)); + auto pub = node->create_publisher(topic_name, 10); + auto sub = node->create_subscription(topic_name, 1, callback); + + auto wait_for_message_to_be_received = [&is_received, &node]() { + rclcpp::executors::SingleThreadedExecutor executor; + int i = 0; + executor.add_node(node); + executor.spin_once(std::chrono::milliseconds(0)); + while (!is_received && i < g_max_loops) { + printf("spin_node_once() - callback (1) expected - try %d/%d\n", ++i, g_max_loops); + executor.spin_once(g_sleep_per_loop); + } + }; + { + { // std::string passed by reference + is_received = false; + pub->publish(message_data); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(); + ASSERT_TRUE(is_received); + } + { // unique pointer to std::string + is_received = false; + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(); + ASSERT_TRUE(is_received); + } + { // ROS message passed by reference + is_received = false; + rclcpp::msg::String msg; + msg.data = message_data; + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(); + ASSERT_TRUE(is_received); + } + { // unique ptr to ROS message + is_received = false; + auto pu_msg = std::make_unique(); + pu_msg->data = message_data; + pub->publish(std::move(pu_msg)); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(); + ASSERT_TRUE(is_received); + } + /* TODO(audrow) Enable once loaned messages are supported for intra process communication + { // loaned ROS message + // is_received = false; + // std::allocator allocator; + // rclcpp::LoanedMessage loaned_msg(*pub, allocator); + // loaned_msg.get().data = message_data; + // pub->publish(std::move(loaned_msg)); + // ASSERT_FALSE(is_received); + // wait_for_message_to_be_received(); + // ASSERT_TRUE(is_received); + } + */ + } +} + +/* + * Testing that publisher sends type adapted types and ROS message types with inter proccess communications. + */ +TEST_F(TestPublisher, check_type_adapted_message_is_sent_and_received) { + using StringTypeAdapter = rclcpp::TypeAdapter; + + initialize(); + + const std::string message_data = "Message Data"; + const std::string topic_name = "topic_name"; + + auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto pub = node->create_publisher(topic_name, 1); + auto sub = node->create_subscription(topic_name, 1, do_nothing); + + auto assert_no_message_was_received_yet = [sub]() { + rclcpp::msg::String msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + }; + auto assert_message_was_received = [sub, message_data]() { + rclcpp::msg::String msg; + rclcpp::MessageInfo msg_info; + bool message_received = false; + auto start = std::chrono::steady_clock::now(); + do { + message_received = sub->take(msg, msg_info); + std::this_thread::sleep_for(100ms); + } while (!message_received && std::chrono::steady_clock::now() - start < 10s); + EXPECT_TRUE(message_received); + ASSERT_STREQ(message_data.c_str(), msg.data.c_str()); + }; + + { // std::string passed by reference + assert_no_message_was_received_yet(); + pub->publish(message_data); + assert_message_was_received(); + } + { // unique pointer to std::string + assert_no_message_was_received_yet(); + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + assert_message_was_received(); + } + { // ROS message passed by reference + assert_no_message_was_received_yet(); + rclcpp::msg::String msg; + msg.data = message_data; + pub->publish(msg); + assert_message_was_received(); + } + { // unique ptr to ROS message + assert_no_message_was_received_yet(); + auto pu_msg = std::make_unique(); + pu_msg->data = message_data; + pub->publish(std::move(pu_msg)); + assert_message_was_received(); + } + { // loaned ROS message + assert_no_message_was_received_yet(); + std::allocator allocator; + rclcpp::LoanedMessage loaned_msg(*pub, allocator); + loaned_msg.get().data = message_data; + pub->publish(std::move(loaned_msg)); + rclcpp::PublisherOptionsWithAllocator> options; + assert_message_was_received(); + } +} diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 9cf9acdcf8..2895865c1f 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -510,7 +510,7 @@ INSTANTIATE_TEST_SUITE_P( TEST_F(TestSubscription, get_network_flow_endpoints_errors) { initialize(); const rclcpp::QoS subscription_qos(1); - auto subscription_callback = [](const test_msgs::msg::Empty::SharedPtr msg) { + auto subscription_callback = [](test_msgs::msg::Empty::SharedPtr msg) { (void)msg; }; auto subscription = node->create_subscription( diff --git a/rclcpp/test/rclcpp/test_subscription_with_type_adapter.cpp b/rclcpp/test/rclcpp/test_subscription_with_type_adapter.cpp new file mode 100644 index 0000000000..54c11bf1b1 --- /dev/null +++ b/rclcpp/test/rclcpp/test_subscription_with_type_adapter.cpp @@ -0,0 +1,563 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/loaned_message.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "../mocking_utils/patch.hpp" +#include "../utils/rclcpp_gtest_macros.hpp" + +#include "test_msgs/msg/empty.hpp" +#include "rclcpp/msg/string.hpp" + + +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + + +using namespace std::chrono_literals; + +static const int g_max_loops = 200; +static const std::chrono::milliseconds g_sleep_per_loop(10); + + +class TestSubscription : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + +protected: + void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) + { + node = std::make_shared("my_node", "/ns", node_options); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } +}; + +namespace rclcpp +{ + +template<> +struct TypeAdapter +{ + using is_specialized = std::true_type; + using custom_type = std::string; + using ros_message_type = rclcpp::msg::String; + + static void + convert_to_ros_message( + const custom_type & source, + ros_message_type & destination) + { + destination.data = source; + } + + static void + convert_to_custom( + const ros_message_type & source, + custom_type & destination) + { + destination = source.data; + } +}; + +} // namespace rclcpp + +void wait_for_message_to_be_received( + bool & is_received, + const std::shared_ptr & node) +{ + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + executor.spin_once(std::chrono::milliseconds(0)); + int i = 0; + while (!is_received && i < g_max_loops) { + printf("spin_node_once() - callback (1) expected - try %d/%d\n", ++i, g_max_loops); + executor.spin_once(g_sleep_per_loop); + } +} +/* + * Testing publisher creation signatures with a type adapter. + */ +TEST_F(TestSubscription, various_creation_signatures) { + initialize(); + { + using StringTypeAdapter = rclcpp::TypeAdapter; + auto sub = + node->create_subscription("topic", 42, [](const std::string &) {}); + (void)sub; + } + { + using StringTypeAdapter = rclcpp::adapt_type::as; + auto sub = + node->create_subscription("topic", 42, [](const std::string &) {}); + (void)sub; + } +} + +/* + * Testing that subscriber receives type adapted types and ROS message types with intra proccess communications. + */ +TEST_F( + CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION), + check_type_adapted_messages_are_received_by_intra_process_subscription) { + using StringTypeAdapter = rclcpp::TypeAdapter; + const std::string message_data = "Message Data"; + const std::string topic_name = "topic_name"; + + auto node = rclcpp::Node::make_shared( + "test_intra_process", + rclcpp::NodeOptions().use_intra_process_comms(true)); + auto pub = node->create_publisher(topic_name, 1); + + rclcpp::msg::String msg; + msg.data = message_data; + + { + { // const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // const std::string & with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::shared_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::shared_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::unique_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // const rclcpp::msg::String & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const rclcpp::msg::String & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.data.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // const rclcpp::msg::String & with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + const rclcpp::msg::String & msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.data.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::shared_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::shared_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::unique_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + } +} + +/* + * Testing that subscriber receives type adapted types and ROS message types with inter proccess communications. + */ +TEST_F( + CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION), + check_type_adapted_messages_are_received_by_inter_process_subscription) { + using StringTypeAdapter = rclcpp::TypeAdapter; + const std::string message_data = "Message Data"; + const std::string topic_name = "topic_name"; + + auto node = rclcpp::Node::make_shared( + "test_intra_process", + rclcpp::NodeOptions().use_intra_process_comms(false)); + auto pub = node->create_publisher(topic_name, 1); + + rclcpp::msg::String msg; + msg.data = message_data; + + { + { // const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received](const std::string & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // const std::string & with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg, const rclcpp::MessageInfo & message_info) -> void + { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::shared_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::shared_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::unique_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // const rclcpp::msg::String & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const rclcpp::msg::String & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.data.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // const rclcpp::msg::String & with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + const rclcpp::msg::String & msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.data.c_str()); + ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::shared_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::shared_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // std::unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // std::unique_ptr with message info + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg->data.c_str()); + ASSERT_FALSE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + pub->publish(msg); + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + } +} diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp index d81750ba08..1c3379712e 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp @@ -230,13 +230,8 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, - typename CallbackMessageT = - typename rclcpp::subscription_traits::has_message_type::type, typename SubscriptionT = rclcpp::Subscription, - typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, - AllocatorT - >> + typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType> std::shared_ptr create_subscription( const std::string & topic_name, diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp index 77598fdcb7..a7f1c686bc 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp @@ -61,7 +61,6 @@ template< typename MessageT, typename CallbackT, typename AllocatorT, - typename CallbackMessageT, typename SubscriptionT, typename MessageMemoryStrategyT> std::shared_ptr