Skip to content

Commit

Permalink
[rclcpp] Type Adaptation feature (#1557)
Browse files Browse the repository at this point in the history
* initial version of type_adaptor.hpp

Signed-off-by: William Woodall <william@osrfoundation.org>

* initial version of rclcpp::get_message_type_support_handle()

Signed-off-by: William Woodall <william@osrfoundation.org>

* initial version of rclcpp::is_ros_compatible_type check

Signed-off-by: William Woodall <william@osrfoundation.org>

* fixup include statement order in publisher.hpp

Signed-off-by: William Woodall <william@osrfoundation.org>

* use new rclcpp::get_message_type_support_handle() and check in Publisher

Signed-off-by: William Woodall <william@osrfoundation.org>

* update adaptor->adapter, update TypeAdapter to use two arguments, add implicit default

Signed-off-by: William Woodall <william@osrfoundation.org>

* move away from shared_ptr<allocator> to just allocator, like the STL

Signed-off-by: William Woodall <william@osrfoundation.org>

* fixes to TypeAdapter and adding new publish function signatures

Signed-off-by: William Woodall <william@osrfoundation.org>

* bugfixes

Signed-off-by: William Woodall <william@osrfoundation.org>

* more bugfixes

Signed-off-by: William Woodall <william@osrfoundation.org>

* Add nullptr check

Signed-off-by: Audrow Nash <audrow@hey.com>

* Remove public from struct inheritance

Signed-off-by: Audrow Nash <audrow@hey.com>

* Add tests for publisher with type adapter

Signed-off-by: Audrow Nash <audrow@hey.com>

* Update packages to C++17

Signed-off-by: Audrow Nash <audrow@hey.com>

* Revert "Update packages to C++17"

This reverts commit 4585605.

Signed-off-by: William Woodall <william@osrfoundation.org>

* Begin updating AnySubscriptionCallback to use the TypeAdapter

Signed-off-by: Audrow Nash <audrow@hey.com>

* Use type adapter's custom type

Signed-off-by: Audrow Nash <audrow@hey.com>

* Correct which AnySubscriptionCallbackHelper is selected

Signed-off-by: Audrow Nash <audrow@hey.com>

* Setup dispatch function to work with adapted types

Signed-off-by: Audrow Nash <audrow@hey.com>

* Improve template logic on dispatch methods

Signed-off-by: Audrow Nash <audrow@hey.com>

* implement TypeAdapter for Subscription

Signed-off-by: William Woodall <william@osrfoundation.org>

* Add intraprocess tests with all supported message types

Signed-off-by: Audrow Nash <audrow@hey.com>

* Add intra process tests

Signed-off-by: Audrow Nash <audrow@hey.com>

* Add tests for subscription with type adapter

Signed-off-by: Audrow Nash <audrow@hey.com>

* Fix null allocator test

Signed-off-by: Audrow Nash <audrow@hey.com>

* Handle serialized message correctly

Signed-off-by: Audrow Nash <audrow@hey.com>

* Fix generic subscription

Signed-off-by: Audrow Nash <audrow@hey.com>

* Fix trailing space

Signed-off-by: Audrow Nash <audrow@hey.com>

* fix some issues found while testing type_adapter in demos

Signed-off-by: William Woodall <william@osrfoundation.org>

* add more tests, WIP

Signed-off-by: William Woodall <william@osrfoundation.org>

* Improve pub/sub tests

Signed-off-by: Audrow Nash <audrow@hey.com>

* Apply uncrustify formatting

Signed-off-by: Audrow Nash <audrow@hey.com>

* finish new tests for any subscription callback with type adapter

Signed-off-by: William Woodall <william@osrfoundation.org>

* fix adapt_type<...>::as<...> syntax

Signed-off-by: William Woodall <william@osrfoundation.org>

* fix explicit template instantiation of create_subscription() in new test

Signed-off-by: William Woodall <william@osrfoundation.org>

* cpplint fix

Signed-off-by: William Woodall <william@osrfoundation.org>

* Fix bug by aligning allocator types on both sides of ipm

Signed-off-by: Audrow Nash <audrow@hey.com>

* Fix intra process manager tests

Signed-off-by: Audrow Nash <audrow@hey.com>

Co-authored-by: Audrow Nash <audrow@hey.com>
  • Loading branch information
wjwwood and paudrow authored May 7, 2021
1 parent 0659d82 commit 2e4c97a
Show file tree
Hide file tree
Showing 31 changed files with 2,792 additions and 368 deletions.
730 changes: 627 additions & 103 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp

Large diffs are not rendered by default.

33 changes: 11 additions & 22 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ template<
typename MessageT,
typename CallbackT,
typename AllocatorT,
typename CallbackMessageT,
typename SubscriptionT,
typename MessageMemoryStrategyT,
typename NodeParametersT,
typename NodeTopicsT>
typename NodeTopicsT,
typename ROSMessageType = typename SubscriptionT::ROSMessageType>
typename std::shared_ptr<SubscriptionT>
create_subscription(
NodeParametersT & node_parameters,
Expand All @@ -70,7 +70,7 @@ create_subscription(
using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics_interface = get_node_topics_interface(node_topics);

std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>
subscription_topic_stats = nullptr;

if (rclcpp::detail::resolve_enable_topic_statistics(
Expand All @@ -92,11 +92,11 @@ create_subscription(
qos);

subscription_topic_stats = std::make_shared<
rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>
rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>
>(node_topics_interface->get_node_base_interface()->get_name(), publisher);

std::weak_ptr<
rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>
rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>
> weak_subscription_topic_stats(subscription_topic_stats);
auto sub_call_back = [weak_subscription_topic_stats]() {
auto subscription_topic_stats = weak_subscription_topic_stats.lock();
Expand Down Expand Up @@ -153,7 +153,6 @@ create_subscription(
* \tparam MessageT
* \tparam CallbackT
* \tparam AllocatorT
* \tparam CallbackMessageT
* \tparam SubscriptionT
* \tparam MessageMemoryStrategyT
* \tparam NodeT
Expand All @@ -171,13 +170,8 @@ template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>,
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType,
typename NodeT>
typename std::shared_ptr<SubscriptionT>
create_subscription(
Expand All @@ -194,7 +188,7 @@ create_subscription(
)
{
return rclcpp::detail::create_subscription<
MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT>(
MessageT, CallbackT, AllocatorT, SubscriptionT, MessageMemoryStrategyT>(
node, node, topic_name, qos, std::forward<CallbackT>(callback), options, msg_mem_strat);
}

Expand All @@ -206,13 +200,8 @@ template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>>
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType>
typename std::shared_ptr<SubscriptionT>
create_subscription(
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr & node_parameters,
Expand All @@ -229,7 +218,7 @@ create_subscription(
)
{
return rclcpp::detail::create_subscription<
MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT>(
MessageT, CallbackT, AllocatorT, SubscriptionT, MessageMemoryStrategyT>(
node_parameters, node_topics, topic_name, qos,
std::forward<CallbackT>(callback), options, msg_mem_strat);
}
Expand Down
23 changes: 12 additions & 11 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/experimental/subscription_intra_process.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/logger.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
Expand Down Expand Up @@ -181,7 +182,7 @@ class IntraProcessManager
do_intra_process_publish(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
Expand Down Expand Up @@ -226,7 +227,7 @@ class IntraProcessManager
{
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
Expand All @@ -243,7 +244,7 @@ class IntraProcessManager
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
Expand Down Expand Up @@ -271,7 +272,7 @@ class IntraProcessManager
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);

if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
Expand Down Expand Up @@ -368,12 +369,12 @@ class IntraProcessManager
auto subscription_base = subscription_it->second.subscription.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
>(subscription_base);
if (nullptr == subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
}
Expand All @@ -393,7 +394,7 @@ class IntraProcessManager
add_owned_msg_to_buffers(
std::unique_ptr<MessageT, Deleter> message,
std::vector<uint64_t> subscription_ids,
std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
Expand All @@ -406,12 +407,12 @@ class IntraProcessManager
auto subscription_base = subscription_it->second.subscription.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
>(subscription_base);
if (nullptr == subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
}
Expand All @@ -423,8 +424,8 @@ class IntraProcessManager
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
MessageAllocTraits::construct(*allocator.get(), ptr, *message);
auto ptr = MessageAllocTraits::allocate(allocator, 1);
MessageAllocTraits::construct(allocator, ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

subscription->provide_intra_process_message(std::move(copy_message));
Expand Down
111 changes: 29 additions & 82 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand All @@ -44,21 +45,27 @@ template<
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>,
typename CallbackMessageT = MessageT>
class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
class SubscriptionIntraProcess
: public SubscriptionIntraProcessBuffer<
MessageT,
Alloc,
Deleter
>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)

using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;

using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer<
MessageT,
Alloc,
Deleter
>::UniquePtr;
>;

public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)

using MessageAllocTraits = typename SubscriptionIntraProcessBufferT::MessageAllocTraits;
using MessageAlloc = typename SubscriptionIntraProcessBufferT::MessageAlloc;
using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstMessageSharedPtr;
using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::MessageUniquePtr;
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;

SubscriptionIntraProcess(
AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
Expand All @@ -67,31 +74,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBase(topic_name, qos_profile),
: SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(
allocator,
context,
topic_name,
qos_profile,
buffer_type),
any_callback_(callback)
{
if (!std::is_same<MessageT, CallbackMessageT>::value) {
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}

// Create the intra-process buffer.
buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
buffer_type,
qos_profile,
allocator);

// Create the guard condition.
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options();

gc_ = rcl_get_zero_initialized_guard_condition();
rcl_ret_t ret = rcl_guard_condition_init(
&gc_, context->get_rcl_context().get(), guard_condition_options);

if (RCL_RET_OK != ret) {
throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
}

TRACEPOINT(
rclcpp_subscription_callback_added,
static_cast<const void *>(this),
Expand All @@ -104,22 +94,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
#endif
}

~SubscriptionIntraProcess()
{
if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy guard condition: %s",
rcutils_get_error_string().str);
}
}

bool
is_ready(rcl_wait_set_t * wait_set)
{
(void) wait_set;
return buffer_->has_data();
}
virtual ~SubscriptionIntraProcess() = default;

std::shared_ptr<void>
take_data()
Expand All @@ -128,9 +103,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
MessageUniquePtr unique_msg;

if (any_callback_.use_take_shared_method()) {
shared_msg = buffer_->consume_shared();
shared_msg = this->buffer_->consume_shared();
} else {
unique_msg = buffer_->consume_unique();
unique_msg = this->buffer_->consume_unique();
}
return std::static_pointer_cast<void>(
std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
Expand All @@ -141,37 +116,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase

void execute(std::shared_ptr<void> & data)
{
execute_impl<CallbackMessageT>(data);
}

void
provide_intra_process_message(ConstMessageSharedPtr message)
{
buffer_->add_shared(std::move(message));
trigger_guard_condition();
}

void
provide_intra_process_message(MessageUniquePtr message)
{
buffer_->add_unique(std::move(message));
trigger_guard_condition();
}

bool
use_take_shared_method() const
{
return buffer_->use_take_shared_method();
}

private:
void
trigger_guard_condition()
{
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
(void)ret;
execute_impl<MessageT>(data);
}

protected:
template<typename T>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(std::shared_ptr<void> & data)
Expand Down Expand Up @@ -206,7 +154,6 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
}

AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
BufferUniquePtr buffer_;
};

} // namespace experimental
Expand Down
Loading

0 comments on commit 2e4c97a

Please sign in to comment.