From de9fc8775fd385aaf9220e7d9b4529ac82c7f751 Mon Sep 17 00:00:00 2001 From: Shoji Morita Date: Tue, 20 Jun 2023 20:17:16 +0900 Subject: [PATCH] Decoupled the additional feature from rclcpp to rcpputils, reflecting on the pointing out below. https://github.com/ros2/rclcpp/pull/2205#issuecomment-1593832758 Signed-off-by: Shoji Morita --- CMakeLists.txt | 7 + include/rcpputils/threads.hpp | 26 +++ .../rcpputils/threads/posix/linux/cpu_set.hpp | 160 +++++++++++++ include/rcpputils/threads/posix/thread.hpp | 210 ++++++++++++++++++ .../threads/posix/thread_attribute.hpp | 190 ++++++++++++++++ .../rcpputils/threads/posix/thread_func.hpp | 53 +++++ include/rcpputils/threads/posix/thread_id.hpp | 146 ++++++++++++ include/rcpputils/threads/posix/utilities.hpp | 42 ++++ include/rcpputils/threads/std/thread.hpp | 145 ++++++++++++ .../threads/std/thread_attribute.hpp | 165 ++++++++++++++ include/rcpputils/threads/windows/thread.hpp | 22 ++ src/threads/posix_thread.cpp | 183 +++++++++++++++ src/threads/windows_thread.cpp | 19 ++ 13 files changed, 1368 insertions(+) create mode 100644 include/rcpputils/threads.hpp create mode 100644 include/rcpputils/threads/posix/linux/cpu_set.hpp create mode 100644 include/rcpputils/threads/posix/thread.hpp create mode 100644 include/rcpputils/threads/posix/thread_attribute.hpp create mode 100644 include/rcpputils/threads/posix/thread_func.hpp create mode 100644 include/rcpputils/threads/posix/thread_id.hpp create mode 100644 include/rcpputils/threads/posix/utilities.hpp create mode 100644 include/rcpputils/threads/std/thread.hpp create mode 100644 include/rcpputils/threads/std/thread_attribute.hpp create mode 100644 include/rcpputils/threads/windows/thread.hpp create mode 100644 src/threads/posix_thread.cpp create mode 100644 src/threads/windows_thread.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b2a6772..f8841e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,13 @@ add_library(${PROJECT_NAME} target_include_directories(${PROJECT_NAME} PUBLIC "$" "$") +if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + target_sources(${PROJECT_NAME} PRIVATE + src/threads/posix_thread.cpp) +elseif(WIN32) + target_sources(${PROJECT_NAME} PRIVATE + src/threads/windows_thread.cpp) +endif() if(WIN32) target_compile_definitions(${PROJECT_NAME} PRIVATE "RCPPUTILS_BUILDING_LIBRARY") diff --git a/include/rcpputils/threads.hpp b/include/rcpputils/threads.hpp new file mode 100644 index 0000000..942fd6a --- /dev/null +++ b/include/rcpputils/threads.hpp @@ -0,0 +1,26 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS_HPP_ +#define RCPPUTILS__THREADS_HPP_ + +#if defined(__linux__) +#include "rcpputils/threads/posix/thread.hpp" +#elif defined(_WIN32) +#include "rcpputils/threads/windows/thread.hpp" +#else +#include "rcpputils/threads/std/thread.hpp" +#endif + +#endif // RCPPUTILS__THREADS_HPP_ diff --git a/include/rcpputils/threads/posix/linux/cpu_set.hpp b/include/rcpputils/threads/posix/linux/cpu_set.hpp new file mode 100644 index 0000000..b227520 --- /dev/null +++ b/include/rcpputils/threads/posix/linux/cpu_set.hpp @@ -0,0 +1,160 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__POSIX__LINUX__CPU_SET_HPP_ +#define RCPPUTILS__THREADS__POSIX__LINUX__CPU_SET_HPP_ + +#include +#include +#include +#include +#include + +#include "rcpputils/visibility_control.hpp" +#include "rcpputils/threads/posix/utilities.hpp" + +namespace rcpputils +{ + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = cpu_set_t; + CpuSet() = default; + explicit CpuSet(std::size_t cpu) + { + init_cpu_set(); + CPU_ZERO_S(alloc_size(), cpu_set_.get()); + CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); + } + CpuSet(const CpuSet & other) + { + if (other.cpu_set_) { + init_cpu_set(); + memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); + } + } + CpuSet & operator=(const CpuSet & other) + { + if (other.cpu_set_) { + init_cpu_set(); + memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); + } else { + clear(); + } + return *this; + } + CpuSet(CpuSet && other) + : CpuSet() + { + swap(other); + } + CpuSet & operator=(CpuSet && other) + { + CpuSet tmp; + other.swap(tmp); + tmp.swap(*this); + return *this; + } + void swap(CpuSet & other) + { + using std::swap; + swap(cpu_set_, other.cpu_set_); + swap(num_proc_, other.num_proc_); + } + void set(std::size_t cpu) + { + init_cpu_set(); + valid_cpu(cpu); + CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); + } + void unset(std::size_t cpu) + { + init_cpu_set(); + valid_cpu(cpu); + CPU_CLR_S(cpu, alloc_size(), cpu_set_.get()); + } + void clear() + { + if (cpu_set_) { + CPU_ZERO_S(alloc_size(), cpu_set_.get()); + } + } + bool is_set(std::size_t cpu) const + { + if (cpu_set_) { + valid_cpu(cpu); + return CPU_ISSET_S(cpu, alloc_size(), cpu_set_.get()); + } else { + return false; + } + } + + std::size_t max_processors() const + { + return num_proc_; + } + std::size_t alloc_size() const + { + return CPU_ALLOC_SIZE(num_proc_); + } + NativeCpuSetType * native_cpu_set() const + { + return cpu_set_.get(); + } + +private: + void init_cpu_set() + { + if (cpu_set_) { + return; + } + auto num_proc = sysconf(_SC_NPROCESSORS_ONLN); + if (num_proc <= 0) { + throw_if_error(num_proc, "unrecognized sysconf(_SC_NPROCESSORS_ONLN) is not valid"); + } + auto p = CPU_ALLOC(CPU_ALLOC_SIZE(num_proc)); + cpu_set_ = std::unique_ptr(p); + num_proc_ = num_proc; + } + void valid_cpu(std::size_t cpu) const + { + if (num_proc_ <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + } + struct CpuSetDeleter + { + void operator()(NativeCpuSetType * cpu_set) const + { + CPU_FREE(cpu_set); + } + }; + std::unique_ptr cpu_set_; + std::size_t num_proc_; +}; + +inline void swap(CpuSet & a, CpuSet & b) +{ + a.swap(b); +} + +} // namespace detail + +} // namespace rcpputils + +#endif // RCPPUTILS__THREADS__POSIX__LINUX__CPU_SET_HPP_ diff --git a/include/rcpputils/threads/posix/thread.hpp b/include/rcpputils/threads/posix/thread.hpp new file mode 100644 index 0000000..37c15cf --- /dev/null +++ b/include/rcpputils/threads/posix/thread.hpp @@ -0,0 +1,210 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__POSIX__THREAD_HPP_ +#define RCPPUTILS__THREADS__POSIX__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rcpputils/threads/posix/thread_attribute.hpp" +#include "rcpputils/threads/posix/thread_func.hpp" +#include "rcpputils/threads/posix/thread_id.hpp" +#include "rcpputils/threads/posix/utilities.hpp" +#include "rcpputils/visibility_control.hpp" + +namespace rcpputils +{ + +RCPPUTILS_PUBLIC_TYPE +struct Thread +{ + using NativeHandleType = pthread_t; + using Attribute = detail::ThreadAttribute; + using Id = detail::ThreadId; + + // Assume pthread_t is an invalid handle if it's 0 + Thread() noexcept + : handle_{} {} + Thread(Thread && other) + : handle_{} + { + swap(other); + } + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : Thread( + static_cast(nullptr), + make_thread_func(std::forward(f), std::forward(args)...)) + {} + template + Thread(Attribute const & attr, F && f, Args && ... args) + : Thread( + &attr, + make_thread_func_with_attr(attr, std::forward(f), std::forward(args)...)) + {} + Thread(Thread const &) = delete; + ~Thread() + { + if (handle_) { + std::terminate(); + } + } + + Thread & operator=(Thread && other) noexcept + { + if (handle_) { + std::terminate(); + } + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(handle_, other.handle_); + swap(name_, other.name_); + } + + void join() + { + void * p; + int r = pthread_join(handle_, &p); + detail::throw_if_error(r, "Error in pthread_join "); + handle_ = NativeHandleType{}; + } + + bool joinable() const noexcept + { + return 0 == pthread_equal(handle_, NativeHandleType{}); + } + + void detach() + { + int r = pthread_detach(handle_); + detail::throw_if_error(r, "Error in pthread_detach "); + handle_ = NativeHandleType{}; + } + + NativeHandleType native_handle() const + { + return handle_; + } + + Id get_id() const noexcept + { + return Id{handle_}; + } + + static unsigned int hardware_concurrency() noexcept + { + auto r = sysconf(_SC_NPROCESSORS_ONLN); + if (r == -1) { + return 0u; + } else { + return static_cast(r); + } + } + +private: + using ThreadFuncBase = detail::ThreadFuncBase; + template + static std::unique_ptr make_thread_func(F && f, Args && ... args) + { + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::ThreadFuncBase * func = new detail::ThreadFunc( + [f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable + { + std::apply(f, args); + }); + return std::unique_ptr(func); + } + template + static std::unique_ptr make_thread_func_with_attr( + Attribute const & attr, + F && f, + Args && ... args) + { + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::ThreadFuncBase * func = new detail::ThreadFunc( + [attr, f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable + { + std::apply(f, args); + }); + return std::unique_ptr(func); + } + + Thread(Attribute const * attr, std::unique_ptr func); + + static void apply_attr(Attribute const & attr); + + NativeHandleType handle_; + std::string name_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace detail +{ +void apply_attr_to_current_thread(ThreadAttribute const & attr); +} + +namespace this_thread +{ + +template +void run_with_thread_attribute( + detail::ThreadAttribute const & attr, F && f, Args && ... args) +{ + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::apply_attr_to_current_thread(attr); + std::invoke(std::forward(f), std::forward(args)...); +} + +inline void yield() noexcept +{ + sched_yield(); +} + +} // namespace this_thread + +} // namespace rcpputils + +#endif // RCPPUTILS__THREADS__POSIX__THREAD_HPP_ diff --git a/include/rcpputils/threads/posix/thread_attribute.hpp b/include/rcpputils/threads/posix/thread_attribute.hpp new file mode 100644 index 0000000..e9f2925 --- /dev/null +++ b/include/rcpputils/threads/posix/thread_attribute.hpp @@ -0,0 +1,190 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ +#define RCPPUTILS__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ + +#include +#include +#include + +#include "rcutils/thread_attr.h" + +#include "rcpputils/visibility_control.hpp" + +#ifdef __linux__ +#include "rcpputils/threads/posix/linux/cpu_set.hpp" +#endif + +namespace rcpputils +{ + +namespace detail +{ + +struct ThreadAttribute +{ + ThreadAttribute(); + + ThreadAttribute(const ThreadAttribute &) = default; + ThreadAttribute(ThreadAttribute &&) = default; + ThreadAttribute & operator=(const ThreadAttribute &) = default; + ThreadAttribute & operator=(ThreadAttribute &&) = default; + + using NativeAttributeType = pthread_attr_t; + + ThreadAttribute & set_affinity(CpuSet cs) + { + cpu_set_ = std::move(cs); + return *this; + } + const CpuSet & get_affinity() const + { + return cpu_set_; + } + + ThreadAttribute & set_sched_policy(rcutils_thread_scheduling_policy_t sp) + { + sched_policy_ = convert_sched_policy(sp); + return *this; + } + int get_sched_policy() const + { + return sched_policy_; + } + + ThreadAttribute & set_stack_size(std::size_t sz) + { + stack_size_ = sz; + return *this; + } + std::size_t get_stack_size() const + { + return stack_size_; + } + + ThreadAttribute & set_priority(int prio) + { + priority_ = prio; + return *this; + } + int get_priority() const + { + return priority_; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + detached_flag_ = detach; + return *this; + } + bool get_run_as_detached() const + { + return detached_flag_; + } + + ThreadAttribute & set_name(std::string name) + { + name_ = std::move(name); + return *this; + } + const std::string & get_name() const + { + return name_; + } + + void + set_thread_attribute( + const rcutils_thread_attr_t & attr) + { + CpuSet cpu_set(attr.core_affinity); + set_affinity(std::move(cpu_set)); + set_sched_policy(attr.scheduling_policy); + set_priority(attr.priority); + set_name(attr.name); + } + + void + swap( + ThreadAttribute & other) + { + using std::swap; + swap(cpu_set_, other.cpu_set_); + swap(sched_policy_, other.sched_policy_); + swap(stack_size_, other.stack_size_); + swap(priority_, other.priority_); + swap(detached_flag_, other.detached_flag_); + swap(name_, other.name_); + } + +private: + CpuSet cpu_set_; + int sched_policy_; + std::size_t stack_size_; + int priority_; + bool detached_flag_; + std::string name_; + + int convert_sched_policy( + rcutils_thread_scheduling_policy_t sched_policy) + { + switch (sched_policy) { +#ifdef SCHED_FIFO + case RCUTILS_THREAD_SCHEDULING_POLICY_FIFO: + return SCHED_FIFO; +#endif +#ifdef SCHED_RR + case RCUTILS_THREAD_SCHEDULING_POLICY_RR: + return SCHED_RR; +#endif +#ifdef SCHED_OTHER + case RCUTILS_THREAD_SCHEDULING_POLICY_OTHER: + return SCHED_OTHER; +#endif +#ifdef SCHED_IDLE + case RCUTILS_THREAD_SCHEDULING_POLICY_IDLE: + return SCHED_IDLE; +#endif +#ifdef SCHED_BATCH + case RCUTILS_THREAD_SCHEDULING_POLICY_BATCH: + return SCHED_BATCH; +#endif +#ifdef SCHED_SPORADIC + case RCUTILS_THREAD_SCHEDULING_POLICY_SPORADIC: + return SCHED_SPORADIC; +#endif + /* Todo: Necessity and setting method need to be considered + #ifdef SCHED_DEADLINE + case RCUTILS_THREAD_SCHEDULING_POLICY_DEADLINE: + return SCHED_DEADLINE; + break; + #endif + */ + default: + throw std::runtime_error("Invalid scheduling policy"); + } + return -1; + } +}; + +inline void swap(ThreadAttribute & a, ThreadAttribute & b) +{ + a.swap(b); +} + +} // namespace detail + +} // namespace rcpputils + +#endif // RCPPUTILS__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ diff --git a/include/rcpputils/threads/posix/thread_func.hpp b/include/rcpputils/threads/posix/thread_func.hpp new file mode 100644 index 0000000..ec4ebd6 --- /dev/null +++ b/include/rcpputils/threads/posix/thread_func.hpp @@ -0,0 +1,53 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__POSIX__THREAD_FUNC_HPP_ +#define RCPPUTILS__THREADS__POSIX__THREAD_FUNC_HPP_ + +#include +#include +#include + +namespace rcpputils::detail +{ + +struct ThreadFuncBase +{ + virtual ~ThreadFuncBase() = default; + virtual void run() = 0; +}; + +template +struct ThreadFunc : ThreadFuncBase +{ + template + explicit ThreadFunc(G && g) + : func_(std::forward(g)) + {} + +private: + void run() override + { + func_(); + } + + F func_; +}; + +template +ThreadFunc(F &&)->ThreadFunc>; + +} // namespace rcpputils::detail + +#endif // RCPPUTILS__THREADS__POSIX__THREAD_FUNC_HPP_ diff --git a/include/rcpputils/threads/posix/thread_id.hpp b/include/rcpputils/threads/posix/thread_id.hpp new file mode 100644 index 0000000..5f46ce7 --- /dev/null +++ b/include/rcpputils/threads/posix/thread_id.hpp @@ -0,0 +1,146 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__POSIX__THREAD_ID_HPP_ +#define RCPPUTILS__THREADS__POSIX__THREAD_ID_HPP_ + +#include + +#include "rcpputils/visibility_control.hpp" + +namespace rcpputils +{ + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wmismatched-tags" +#endif + +struct Thread; + +namespace detail +{ + +namespace thread_id_ns +{ + +struct ThreadId; + +inline ThreadId get_id() noexcept; +inline bool operator==(ThreadId id1, ThreadId id2); +inline bool operator!=(ThreadId id1, ThreadId id2); +inline bool operator<(ThreadId id1, ThreadId id2); +inline bool operator>(ThreadId id1, ThreadId id2); +inline bool operator<=(ThreadId id1, ThreadId id2); +inline bool operator>=(ThreadId id1, ThreadId id2); +template +inline std::basic_ostream & operator<<( + std::basic_ostream &, + ThreadId); + +struct ThreadId +{ + ThreadId() = default; + ThreadId(ThreadId const &) = default; + ThreadId(ThreadId &&) = default; + ThreadId & operator=(ThreadId const &) = default; + ThreadId & operator=(ThreadId &&) = default; + + friend bool operator==(ThreadId id1, ThreadId id2) + { + return pthread_equal(id1.h, id2.h); + } + friend bool operator<(ThreadId id1, ThreadId id2) + { + return id1.h < id2.h; + } + template + friend std::basic_ostream & operator<<( + std::basic_ostream & ost, + ThreadId id) + { + return ost << id.h; + } + +private: + friend class rcpputils::Thread; + friend ThreadId get_id() noexcept; + friend struct std::hash; + explicit ThreadId(pthread_t h) + : h(h) {} + pthread_t h; +}; + +ThreadId get_id() noexcept +{ + return ThreadId{pthread_self()}; +} + +bool operator!=(ThreadId id1, ThreadId id2) +{ + return !(id1 == id2); +} + +bool operator>(ThreadId id1, ThreadId id2) +{ + return id2 < id1; +} + +bool operator<=(ThreadId id1, ThreadId id2) +{ + return !(id1 > id2); +} + +bool operator>=(ThreadId id1, ThreadId id2) +{ + return !(id1 < id2); +} + +} // namespace thread_id_ns + +using thread_id_ns::ThreadId; +using thread_id_ns::operator==; +using thread_id_ns::operator!=; +using thread_id_ns::operator<; // NOLINT +using thread_id_ns::operator>; // NOLINT +using thread_id_ns::operator<=; +using thread_id_ns::operator>=; +using thread_id_ns::operator<<; + +} // namespace detail + +namespace this_thread +{ + +using detail::thread_id_ns::get_id; + +} // namespace this_thread + +} // namespace rcpputils + +namespace std +{ + +template<> +struct hash +{ + std::size_t operator()(rcpputils::detail::thread_id_ns::ThreadId id) + { + return id.h; + } +}; + +} // namespace std + +#endif // RCPPUTILS__THREADS__POSIX__THREAD_ID_HPP_ diff --git a/include/rcpputils/threads/posix/utilities.hpp b/include/rcpputils/threads/posix/utilities.hpp new file mode 100644 index 0000000..4d9b054 --- /dev/null +++ b/include/rcpputils/threads/posix/utilities.hpp @@ -0,0 +1,42 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__POSIX__UTILITIES_HPP_ +#define RCPPUTILS__THREADS__POSIX__UTILITIES_HPP_ + +#include + +namespace rcpputils +{ + +namespace detail +{ + +namespace +{ + +inline void throw_if_error(int r, char const * msg) +{ + if (r != 0) { + throw std::system_error(r, std::system_category(), msg); + } +} + +} // namespace + +} // namespace detail + +} // namespace rcpputils + +#endif // RCPPUTILS__THREADS__POSIX__UTILITIES_HPP_ diff --git a/include/rcpputils/threads/std/thread.hpp b/include/rcpputils/threads/std/thread.hpp new file mode 100644 index 0000000..c0ac507 --- /dev/null +++ b/include/rcpputils/threads/std/thread.hpp @@ -0,0 +1,145 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__STD__THREAD_HPP_ +#define RCPPUTILS__THREADS__STD__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rcpputils/threads/std/thread_attribute.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rcpputils +{ + +struct Thread +{ + using NativeHandleType = std::thread::native_handle_type; + using Attribute = detail::ThreadAttribute; + using Id = std::thread::id; + + Thread() noexcept + : thread_{} + {} + Thread(Thread && other) + : thread_{} + { + swap(other); + } + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + { + if (attr.set_unavailable_items_) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + if (attr.get_run_as_detached()) { + thread_.detach(); + } + } + Thread(Thread const &) = delete; + ~Thread() {} + + Thread & operator=(Thread && other) noexcept + { + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(thread_, other.thread_); + } + + void join() + { + thread_.join(); + thread_ = std::thread{}; + } + + bool joinable() const noexcept + { + return thread_.joinable(); + } + + void detach() + { + thread_.detach(); + thread_ = std::thread{}; + } + + NativeHandleType native_handle() + { + return thread_.native_handle(); + } + + Id get_id() const noexcept + { + return thread_.get_id(); + } + + static int hardware_concurrency() noexcept + { + return std::thread::hardware_concurrency(); + } + +private: + std::thread thread_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace this_thread +{ + +template +void run_with_thread_attribute(Thread::Attribute & attr, F && f, Args && ... args) +{ + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that is ineffective on thread"); + + if (attr.set_unavailable_items_) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + + std::invoke(f, std::forward(args)...); +} + +} // namespace this_thread + +} // namespace rcpputils + +#endif // RCPPUTILS__THREADS__STD__THREAD_HPP_ diff --git a/include/rcpputils/threads/std/thread_attribute.hpp b/include/rcpputils/threads/std/thread_attribute.hpp new file mode 100644 index 0000000..b799cd6 --- /dev/null +++ b/include/rcpputils/threads/std/thread_attribute.hpp @@ -0,0 +1,165 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__STD__THREAD_ATTRIBUTE_HPP_ +#define RCPPUTILS__THREADS__STD__THREAD_ATTRIBUTE_HPP_ + +#include +#include +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +namespace rcpputils +{ + +struct Thread; + +namespace detail +{ +struct ThreadAttribute; +} // namespace detail + +namespace this_thread +{ +template +void run_with_thread_attribute( + detail::ThreadAttribute & attr, F && f, Args && ... args); +} // namespace this_thread + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = std::size_t; + CpuSet() {} + explicit CpuSet(std::size_t) {} + CpuSet(const CpuSet &) {} + CpuSet & operator=(const CpuSet &) + { + return *this; + } + CpuSet(CpuSet &&) = delete; + CpuSet & operator=(CpuSet &&) = delete; + ~CpuSet() {} + void set(std::size_t) {} + void unset(std::size_t) {} + void clear() {} + bool is_set(std::size_t) + { + return false; + } + std::size_t get_max_processors() const + { + return 0; + } + NativeCpuSetType native_cpu_set() const + { + return 0; + } +}; + +struct ThreadAttribute +{ + using PriorityType = int; + + ThreadAttribute() + : set_unavailable_items_(false), run_as_detached_(false) {} + + ThreadAttribute(const ThreadAttribute &) = default; + ThreadAttribute(ThreadAttribute &&) = default; + ThreadAttribute & operator=(const ThreadAttribute &) = default; + ThreadAttribute & operator=(ThreadAttribute &&) = default; + + ThreadAttribute & set_affinity(CpuSet &) + { + set_unavailable_items_ = true; + return *this; + } + CpuSet get_affinity() + { + return CpuSet{}; + } + + ThreadAttribute & set_stack_size(std::size_t) + { + set_unavailable_items_ = true; + return *this; + } + std::size_t get_stack_size() const + { + return 0; + } + + ThreadAttribute & set_priority(int prio) + { + (void)prio; + set_unavailable_items_ = true; + return *this; + } + int get_priority() const + { + return 0; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + run_as_detached_ = detach; + return *this; + } + bool get_run_as_detached() const + { + return run_as_detached_; + } + + ThreadAttribute & set_name(std::string const &) + { + set_unavailable_items_ = true; + return *this; + } + const char * get_name() const + { + return ""; + } + + void + set_thread_attribute( + const rcutils_thread_attr_t &) + { + set_unavailable_items_ = true; + } + + void swap( + ThreadAttribute & other) + { + std::swap(*this, other); + } + +private: + friend struct rcpputils::Thread; + template + friend void this_thread::run_with_thread_attribute( + ThreadAttribute & attr, F && f, Args && ... args); + + bool set_unavailable_items_; + bool run_as_detached_; +}; + +} // namespace detail + +} // namespace rcpputils + +#endif // RCPPUTILS__THREADS__STD__THREAD_ATTRIBUTE_HPP_ diff --git a/include/rcpputils/threads/windows/thread.hpp b/include/rcpputils/threads/windows/thread.hpp new file mode 100644 index 0000000..ec6613d --- /dev/null +++ b/include/rcpputils/threads/windows/thread.hpp @@ -0,0 +1,22 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCPPUTILS__THREADS__WINDOWS__THREAD_HPP_ +#define RCPPUTILS__THREADS__WINDOWS__THREAD_HPP_ + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. + +#endif // RCPPUTILS__THREADS__WINDOWS__THREAD_HPP_ diff --git a/src/threads/posix_thread.cpp b/src/threads/posix_thread.cpp new file mode 100644 index 0000000..e60b848 --- /dev/null +++ b/src/threads/posix_thread.cpp @@ -0,0 +1,183 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include "rcpputils/threads/posix/thread.hpp" +#include "rcpputils/threads/posix/utilities.hpp" + + +namespace rcpputils +{ + +namespace detail +{ + +namespace +{ +void set_pthread_attr(pthread_attr_t & native_attr, rcpputils::Thread::Attribute const & attr); +void * thread_main(void * p); +} // namespace + +} // namespace detail + +Thread::Thread(Attribute const * attr, std::unique_ptr func) +: handle_(NativeHandleType{}), name_(attr ? attr->get_name() : std::string{}) +{ + Attribute::NativeAttributeType native_attr; + int r = pthread_attr_init(&native_attr); + detail::throw_if_error(r, "Error in pthread_attr_init "); + + if (attr != nullptr) { + detail::set_pthread_attr(native_attr, *attr); + } + + NativeHandleType h; + r = pthread_create(&h, &native_attr, detail::thread_main, func.get()); + detail::throw_if_error(r, "Error in pthread_create "); + + if (attr == nullptr || !attr->get_run_as_detached()) { + this->handle_ = h; + } + + pthread_attr_destroy(&native_attr); + + func.release(); +} + +void Thread::apply_attr(Attribute const & attr) +{ + int r; + int policy = attr.get_sched_policy(); +#if __linux__ + if (policy != SCHED_FIFO && policy != SCHED_RR && policy != SCHED_OTHER) { + sched_param param; + param.sched_priority = attr.get_priority(); + r = pthread_setschedparam(pthread_self(), policy, ¶m); + detail::throw_if_error(r, "Error in pthread_setschedparam "); + } +#endif // #if __linux__ +} + +namespace detail +{ + +ThreadAttribute::ThreadAttribute() +{ + NativeAttributeType attr; + int r; + + r = pthread_attr_init(&attr); + throw_if_error(r, "Error in pthread_attr_init "); + + r = pthread_attr_getschedpolicy(&attr, &sched_policy_); + throw_if_error(r, "Error in pthread_attr_getschedpolicy "); + + r = pthread_attr_getstacksize(&attr, &stack_size_); + throw_if_error(r, "Error in pthread_attr_getstacksize "); + + sched_param param; + r = pthread_attr_getschedparam(&attr, ¶m); + throw_if_error(r, "Error in pthread_attr_getschedparam "); + priority_ = param.sched_priority; + + int flag; + r = pthread_attr_getdetachstate(&attr, &flag); + throw_if_error(r, "Error in pthread_attr_getdetachstate "); + detached_flag_ = (flag == PTHREAD_CREATE_DETACHED); + + pthread_attr_destroy(&attr); +} + + +void apply_attr_to_current_thread(ThreadAttribute const & attr) +{ + int r; + +#if __linux__ + CpuSet cpu_set = attr.get_affinity(); + CpuSet::NativeCpuSetType * native_cpu_set = cpu_set.native_cpu_set(); + if (native_cpu_set) { + std::size_t alloc_size = cpu_set.alloc_size(); + r = pthread_setaffinity_np(pthread_self(), alloc_size, native_cpu_set); + throw_if_error(r, "Error in sched_setaffinity "); + } +#endif // #if __linux__ + + sched_param param; + param.sched_priority = attr.get_priority(); + int policy = attr.get_sched_policy(); + r = pthread_setschedparam(pthread_self(), policy, ¶m); + throw_if_error(r, "Error in sched_setscheduler"); +} + +namespace +{ + +void * thread_main(void * p) +{ + std::unique_ptr func(reinterpret_cast(p)); + + try { + func->run(); + } catch (...) { + std::cerr << "failed to run thread" << std::endl; + std::terminate(); + } + + return nullptr; +} + +void set_pthread_attr(pthread_attr_t & native_attr, Thread::Attribute const & attr) +{ + int r; + +#if defined(__linux__) + CpuSet affinity = attr.get_affinity(); + size_t cpu_size = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_setaffinity_np(&native_attr, cpu_size, affinity.native_cpu_set()); + throw_if_error(r, "Error in pthread_attr_setaffinity_np "); +#endif + + std::size_t stack_size = attr.get_stack_size(); + r = pthread_attr_setstacksize(&native_attr, stack_size); + throw_if_error(r, "Error in pthread_attr_setstacksize "); + + int flag = attr.get_run_as_detached() ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE; + r = pthread_attr_setdetachstate(&native_attr, flag); + throw_if_error(r, "Error in pthread_attr_setdetachstate "); + + int sched_policy = attr.get_sched_policy(); + if (sched_policy == SCHED_FIFO || sched_policy == SCHED_RR) { + r = pthread_attr_setinheritsched(&native_attr, PTHREAD_EXPLICIT_SCHED); + throw_if_error(r, "Error in pthread_attr_setinheritsched "); + + r = pthread_attr_setschedpolicy(&native_attr, sched_policy); + throw_if_error(r, "Error in pthread_attr_setschedpolicy "); + + sched_param param; + param.sched_priority = attr.get_priority(); + r = pthread_attr_setschedparam(&native_attr, ¶m); + throw_if_error(r, "Error in pthread_attr_setschedparam "); + } +} + +} // namespace + +} // namespace detail + +} // namespace rcpputils diff --git a/src/threads/windows_thread.cpp b/src/threads/windows_thread.cpp new file mode 100644 index 0000000..a37a201 --- /dev/null +++ b/src/threads/windows_thread.cpp @@ -0,0 +1,19 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rcpputils/threads/windows/thread.hpp" + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG.