Autonomy Software C++ 24.5.1
Welcome to the Autonomy Software repository of the Mars Rover Design Team (MRDT) at Missouri University of Science and Technology (Missouri S&T)! API reference contains the source code and other resources for the development of the autonomy software for our Mars rover. The Autonomy Software project aims to compete in the University Rover Challenge (URC) by demonstrating advanced autonomous capabilities and robust navigation algorithms.
Loading...
Searching...
No Matches
BS_thread_pool.hpp
Go to the documentation of this file.
1
17#ifndef BS_THREAD_POOL_HPP
18#define BS_THREAD_POOL_HPP
19
20// We need to include <version> since if we're using `import std` it will not define any feature-test macros.
21#ifdef __has_include
22 #if __has_include(<version>)
23 #include <version> // NOLINT(misc-include-cleaner)
24 #endif
25#endif
26
27// At the time of this release, there is a bug in Clang with libc++ where using `std::jthread` in a C++20 module causes a compilation error. As a workaround, until the bug is fixed, the thread pool library automatically falls back to `std::thread` if it detects that Clang and libc++ are being used together with C++20 modules. This workaround can be disabled by defining `BS_THREAD_POOL_DISABLE_WORKAROUNDS` when compiling the module. TODO: Remove this workaround when the bug is fixed.
28#if defined(__clang__) && defined(_LIBCPP_VERSION) && defined(BS_THREAD_POOL_MODULE) && (__cplusplus >= 202002L) && !defined(BS_THREAD_POOL_DISABLE_WORKAROUNDS)
29 #ifdef __cpp_lib_jthread
30 #undef __cpp_lib_jthread
31 #endif
32#endif
33
34// At the time of this release, there is a bug when using GCC with libstdc++ on Windows via MSYS2 where the `BS.thread_pool` module doesn't compile if both native extensions and `import std` are enabled. As a workaround, until the bug is fixed, the thread pool library automatically falls back to header files if it detects that GCC and libstdc++ are being used together with the C++23 `std` module on Windows. This workaround can be disabled by defining `BS_THREAD_POOL_DISABLE_WORKAROUNDS` when compiling the module. TODO: Remove this workaround when the bug is fixed.
35#if (defined(__GNUC__) && defined(_GLIBCXX_RELEASE) && defined(_WIN32)) && !defined(BS_THREAD_POOL_DISABLE_WORKAROUNDS)
36 #ifdef BS_THREAD_POOL_IMPORT_STD
37 #undef BS_THREAD_POOL_IMPORT_STD
38 #endif
39#endif
40
41// In GCC with libstdc++ on Linux, loading the system headers after `import std` causes compilation errors, so we load them first.
42#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
43 #if defined(_WIN32)
44 #ifndef WIN32_LEAN_AND_MEAN
45 #define WIN32_LEAN_AND_MEAN
46 #endif
47 #ifndef NOMINMAX
48 #define NOMINMAX
49 #endif
50 #include <windows.h>
51 #elif defined(__linux__) || defined(__APPLE__)
52 #include <pthread.h>
53 #include <sched.h>
54 #include <sys/resource.h>
55 #include <unistd.h>
56 #if defined(__linux__)
57 #include <sys/syscall.h>
58 #include <sys/sysinfo.h>
59 #endif
60 #else
61 #undef BS_THREAD_POOL_NATIVE_EXTENSIONS
62 #endif
63#endif
64
65// If the macro `BS_THREAD_POOL_IMPORT_STD` is defined, import the C++ Standard Library as a module. Otherwise, include the relevant Standard Library header files.
66#if defined(BS_THREAD_POOL_IMPORT_STD) && (__cplusplus >= 202004L)
67 // Only allow importing the `std` module if the library itself is imported as a module. If the library is included as a header file, this will force the program that included the header file to also import `std`, which is not desirable and can lead to compilation errors if the program `#include`s any Standard Library header files.
68 #ifdef BS_THREAD_POOL_MODULE
69import std;
70 #else
71 #error "The thread pool library cannot import the C++ Standard Library as a module using `import std` if the library itself is not imported as a module. Either use `import BS.thread_pool` to import the library, or remove the `BS_THREAD_POOL_IMPORT_STD` macro. Aborting compilation."
72 #endif
73#else
74 #undef BS_THREAD_POOL_IMPORT_STD
75
76 #include <algorithm>
77 #include <chrono>
78 #include <condition_variable>
79 #include <cstddef>
80 #include <cstdint>
81 #include <functional>
82 #include <future>
83 #include <iostream>
84 #include <limits>
85 #include <memory>
86 #include <mutex>
87 #include <optional>
88 #include <queue>
89 #include <string>
90 #include <thread>
91 #include <tuple>
92 #include <type_traits>
93 #include <utility>
94 #include <variant>
95 #include <vector>
96
97 #ifdef __cpp_concepts
98 #include <concepts>
99 #endif
100 #ifdef __cpp_exceptions
101 #include <exception>
102 #include <stdexcept>
103 #endif
104 #ifdef __cpp_impl_three_way_comparison
105 #include <compare>
106 #endif
107 #ifdef __cpp_lib_int_pow2
108 #include <bit>
109 #endif
110 #ifdef __cpp_lib_jthread
111 #include <stop_token>
112 #endif
113#endif
114
115// On Linux, <sys/sysmacros.h> defines macros called `major` and `minor`, which we undefine here to prevent conflicts.
116#ifdef major
117 #undef major
118#endif
119#ifdef minor
120 #undef minor
121#endif
122
123// On Windows, <windows.h> defines macros called `min` and `max`, which we undefine here to prevent conflicts.
124#ifdef min
125 #undef min
126#endif
127#ifdef max
128 #undef max
129#endif
130
134namespace BS {
135// Macros indicating the version of the thread pool library.
136#define BS_THREAD_POOL_VERSION_MAJOR 5
137#define BS_THREAD_POOL_VERSION_MINOR 1
138#define BS_THREAD_POOL_VERSION_PATCH 0
139
143struct [[nodiscard]] version
144{
145 constexpr version(const std::uint64_t major_, const std::uint64_t minor_, const std::uint64_t patch_) noexcept : major(major_), minor(minor_), patch(patch_) {}
146
147// In C++20 and later we can use the spaceship operator `<=>` to automatically generate comparison operators. In C++17 we have to define them manually.
148#ifdef __cpp_impl_three_way_comparison
149 std::strong_ordering operator<=>(const version&) const = default;
150#else
151 [[nodiscard]] constexpr friend bool operator==(const version& lhs, const version& rhs) noexcept
152 {
153 return std::tuple(lhs.major, lhs.minor, lhs.patch) == std::tuple(rhs.major, rhs.minor, rhs.patch);
154 }
155
156 [[nodiscard]] constexpr friend bool operator!=(const version& lhs, const version& rhs) noexcept
157 {
158 return !(lhs == rhs);
159 }
160
161 [[nodiscard]] constexpr friend bool operator<(const version& lhs, const version& rhs) noexcept
162 {
163 return std::tuple(lhs.major, lhs.minor, lhs.patch) < std::tuple(rhs.major, rhs.minor, rhs.patch);
164 }
165
166 [[nodiscard]] constexpr friend bool operator>=(const version& lhs, const version& rhs) noexcept
167 {
168 return !(lhs < rhs);
169 }
170
171 [[nodiscard]] constexpr friend bool operator>(const version& lhs, const version& rhs) noexcept
172 {
173 return std::tuple(lhs.major, lhs.minor, lhs.patch) > std::tuple(rhs.major, rhs.minor, rhs.patch);
174 }
175
176 [[nodiscard]] constexpr friend bool operator<=(const version& lhs, const version& rhs) noexcept
177 {
178 return !(lhs > rhs);
179 }
180#endif
181
182 [[nodiscard]] std::string to_string() const
183 {
184 return std::to_string(major) + '.' + std::to_string(minor) + '.' + std::to_string(patch);
185 }
186
187 friend std::ostream& operator<<(std::ostream& stream, const version& ver)
188 {
189 stream << ver.to_string();
190 return stream;
191 }
192
193 std::uint64_t major;
194 std::uint64_t minor;
195 std::uint64_t patch;
196}; // struct version
197
201inline constexpr version thread_pool_version(BS_THREAD_POOL_VERSION_MAJOR, BS_THREAD_POOL_VERSION_MINOR, BS_THREAD_POOL_VERSION_PATCH);
202
203#ifdef BS_THREAD_POOL_MODULE
204// If the library is being compiled as a module, ensure that the version of the module file matches the version of the header file.
205static_assert(thread_pool_version == version(BS_THREAD_POOL_MODULE), "The versions of BS.thread_pool.cppm and BS_thread_pool.hpp do not match. Aborting compilation.");
209inline constexpr bool thread_pool_module = true;
210#else
214inline constexpr bool thread_pool_module = false;
215#endif
216
217#ifdef BS_THREAD_POOL_IMPORT_STD
221inline constexpr bool thread_pool_import_std = true;
222#else
226inline constexpr bool thread_pool_import_std = false;
227#endif
228
229#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
233inline constexpr bool thread_pool_native_extensions = true;
234#else
238inline constexpr bool thread_pool_native_extensions = false;
239#endif
240
244using opt_t = std::uint8_t;
245
249enum class tp : opt_t
250{
254 none = 0,
255
259 priority = 1 << 0,
260
264 pause = 1 << 1,
265
269 wait_deadlock_checks = 1 << 2
270};
271
272// NOLINTBEGIN(bugprone-macro-parentheses)
273#define BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(ENUM, OP) \
274 constexpr ENUM operator OP(const ENUM lhs, const ENUM rhs) noexcept \
275 { \
276 return static_cast<ENUM>(static_cast<std::underlying_type_t<ENUM>>(lhs) OP static_cast<std::underlying_type_t<ENUM>>(rhs)); \
277 } \
278 constexpr ENUM& operator OP##=(ENUM& lhs, const ENUM rhs) noexcept \
279 { \
280 return lhs = lhs OP rhs; \
281 }
282// NOLINTEND(bugprone-macro-parentheses)
283
284BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(tp, &)
285BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(tp, |)
286BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(tp, ^)
287
288constexpr tp operator~(const tp value) noexcept
289{
290 return static_cast<tp>(~static_cast<std::underlying_type_t<tp>>(value));
291}
292
293template <tp>
294class thread_pool;
295
296#ifdef __cpp_lib_move_only_function
300using std::move_only_function;
301#else
302template <typename...>
304
311template <typename R, typename... Args>
312class move_only_function<R(Args...)>
313{
314public:
315 move_only_function() = default;
316 move_only_function(move_only_function&&) noexcept = default;
317 move_only_function& operator=(move_only_function&&) noexcept = default;
318 move_only_function(const move_only_function&) = delete;
320 ~move_only_function() = default;
321
322 template <typename F, typename = std::enable_if_t<!std::is_same_v<std::decay_t<F>, move_only_function> && std::is_invocable_r_v<R, F&, Args...>>>
323 move_only_function(F&& func) : ptr(std::make_unique<func_model<std::decay_t<F>>>(std::forward<F>(func))) {} // NOLINT(hicpp-explicit-conversions)
324
325 R operator()(Args... args)
326 {
327 return ptr->call(std::forward<Args>(args)...);
328 }
329
330private:
331 struct func_concept
332 {
333 virtual ~func_concept() = default;
334 virtual R call(Args... args) = 0;
335 };
336
337 template <typename F>
338 struct func_model final : func_concept
339 {
340 template <typename T, typename = std::enable_if_t<!std::is_same_v<std::decay_t<T>, func_model>>>
341 explicit func_model(T&& func) : stored_func(std::forward<T>(func)) {}
342
343 R call(Args... args) override
344 {
345 if constexpr (std::is_void_v<R>)
346 {
347 std::invoke(stored_func, std::forward<Args>(args)...);
348 }
349 else
350 {
351 return std::invoke(stored_func, std::forward<Args>(args)...);
352 }
353 }
354
355 F stored_func;
356 };
357
358 std::unique_ptr<func_concept> ptr = nullptr;
359};
360#endif
361
366
367#ifdef __cpp_lib_jthread
371using thread_t = std::jthread;
372 // The following macros are used to determine how to stop the workers. In C++20 and later we can use `std::stop_token`.
373 #define BS_THREAD_POOL_WORKER_TOKEN const std::stop_token &stop_token,
374 #define BS_THREAD_POOL_WAIT_TOKEN , stop_token
375 #define BS_THREAD_POOL_STOP_CONDITION stop_token.stop_requested()
376 #define BS_THREAD_POOL_OR_STOP_CONDITION
377#else
381using thread_t = std::thread;
382 // The following macros are used to determine how to stop the workers. In C++17 we use a manual flag `workers_running`.
383 #define BS_THREAD_POOL_WORKER_TOKEN
384 #define BS_THREAD_POOL_WAIT_TOKEN
385 #define BS_THREAD_POOL_STOP_CONDITION !workers_running
386 #define BS_THREAD_POOL_OR_STOP_CONDITION || !workers_running
387#endif
388
392using priority_t = std::int8_t;
393
397enum pr : priority_t // NOLINT(cppcoreguidelines-use-enum-class) This cannot be an `enum class` because we need the numerical values.
398{
399 lowest = -128,
400 low = -64,
401 normal = 0,
402 high = +64,
403 highest = +127
404};
405
409struct [[nodiscard]] pr_task
410{
417 explicit pr_task(task_t&& task_, const priority_t priority_ = 0) noexcept(std::is_nothrow_move_constructible_v<task_t>) : task(std::move(task_)), priority(priority_) {}
418
426 [[nodiscard]] friend bool operator<(const pr_task& lhs, const pr_task& rhs) noexcept
427 {
428 return lhs.priority < rhs.priority;
429 }
430
434 mutable task_t task;
435
440}; // struct pr_task
441
442// In C++20 and later we can use concepts. In C++17 we instead use SFINAE ("Substitution Failure Is Not An Error") with `std::enable_if_t`.
443#ifdef __cpp_concepts
444 #define BS_THREAD_POOL_IF_PAUSE_ENABLED template <bool P = pause_enabled> requires(P)
445template <typename F>
446concept init_func_c = std::invocable<F> || std::invocable<F, std::size_t>;
447 #define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) init_func_c F
448#else
449 #define BS_THREAD_POOL_IF_PAUSE_ENABLED template <bool P = pause_enabled, typename = std::enable_if_t<P>>
450 #define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) typename F, typename = std::enable_if_t<std::is_invocable_v<F> || std::is_invocable_v<F, std::size_t>> // NOLINT(bugprone-macro-parentheses)
451#endif
452
458template <typename T>
459class [[nodiscard]] multi_future : public std::vector<std::future<T>>
460{
461public:
462 // Inherit all constructors from the base class `std::vector`.
463 using std::vector<std::future<T>>::vector;
464
470 [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>> get()
471 {
472 if constexpr (std::is_void_v<T>)
473 {
474 for (std::future<T>& future : *this)
475 future.get();
476 return;
477 }
478 else
479 {
480 std::vector<T> results;
481 results.reserve(this->size());
482 for (std::future<T>& future : *this)
483 results.push_back(future.get());
484 return results;
485 }
486 }
487
493 [[nodiscard]] std::size_t ready_count() const
494 {
495 std::size_t count = 0;
496 for (const std::future<T>& future : *this)
497 {
498 if (future.wait_for(std::chrono::duration<double>::zero()) == std::future_status::ready)
499 ++count;
500 }
501 return count;
502 }
503
509 [[nodiscard]] bool valid() const noexcept
510 {
511 bool is_valid = true;
512 for (const std::future<T>& future : *this)
513 is_valid = is_valid && future.valid();
514 return is_valid;
515 }
516
520 void wait() const
521 {
522 for (const std::future<T>& future : *this)
523 future.wait();
524 }
525
534 template <typename R, typename P>
535 bool wait_for(const std::chrono::duration<R, P>& duration) const
536 {
537 const std::chrono::time_point<std::chrono::steady_clock> start_time = std::chrono::steady_clock::now();
538 for (const std::future<T>& future : *this)
539 {
540 future.wait_for(duration - (std::chrono::steady_clock::now() - start_time));
541 if (duration < std::chrono::steady_clock::now() - start_time)
542 return false;
543 }
544 return true;
545 }
546
555 template <typename C, typename D>
556 bool wait_until(const std::chrono::time_point<C, D>& timeout_time) const
557 {
558 for (const std::future<T>& future : *this)
559 {
560 future.wait_until(timeout_time);
561 if (timeout_time < C::now())
562 return false;
563 }
564 return true;
565 }
566}; // class multi_future
567
573template <typename T>
574class [[nodiscard]] blocks
575{
576public:
584 blocks(const T first_index_, const T index_after_last_, const std::size_t num_blocks_) noexcept : num_blocks(num_blocks_), first_index(first_index_), index_after_last(index_after_last_)
585 {
586 if (index_after_last > first_index)
587 {
588 const std::size_t total_size = static_cast<std::size_t>(index_after_last - first_index);
589 num_blocks = std::min(num_blocks, total_size);
590 block_size = total_size / num_blocks;
591 remainder = total_size % num_blocks;
592 if (block_size == 0)
593 {
594 block_size = 1;
595 num_blocks = (total_size > 1) ? total_size : 1;
596 }
597 }
598 else
599 {
600 num_blocks = 0;
601 }
602 }
603
610 [[nodiscard]] T end(const std::size_t block) const noexcept
611 {
612 return (block == num_blocks - 1) ? index_after_last : start(block + 1);
613 }
614
620 [[nodiscard]] std::size_t get_num_blocks() const noexcept
621 {
622 return num_blocks;
623 }
624
631 [[nodiscard]] T start(const std::size_t block) const noexcept
632 {
633 return first_index + static_cast<T>(block * block_size) + static_cast<T>(block < remainder ? block : remainder);
634 }
635
636private:
640 std::size_t block_size = 0;
641
645 std::size_t num_blocks = 0;
646
650 std::size_t remainder = 0;
651
655 T first_index = 0;
656
660 T index_after_last = 0;
661}; // class blocks
662
670template <typename T, typename F, typename R>
672{
673 R operator()()
674 {
675 return (*block_ptr)(start, end);
676 }
677
678 std::shared_ptr<std::decay_t<F>> block_ptr;
679 T start;
680 T end;
681}; // struct block_task
682
689template <typename T, typename F>
691{
692 void operator()()
693 {
694 for (T i = start; i < end; ++i)
695 (*loop_ptr)(i);
696 }
697
698 std::shared_ptr<std::decay_t<F>> loop_ptr;
699 T start;
700 T end;
701}; // struct loop_task
702
710template <typename T, typename F, typename R>
712{
713 R operator()()
714 {
715 return (*sequence_ptr)(i);
716 }
717
718 std::shared_ptr<std::decay_t<F>> sequence_ptr;
719 T i;
720}; // struct sequence_task
721
727template <typename R>
729{
730 template <typename F, typename = std::enable_if_t<!std::is_same_v<std::decay_t<F>, task_and_future<R>>>>
731 explicit task_and_future(F&& func)
732 {
733 std::promise<R> promise;
734 future = promise.get_future();
735 task = [task = std::forward<F>(func), promise = std::move(promise)]() mutable
736 {
737#ifdef __cpp_exceptions
738 try
739 {
740#endif
741 if constexpr (std::is_void_v<R>)
742 {
743 task();
744 promise.set_value();
745 }
746 else
747 {
748 promise.set_value(task());
749 }
750#ifdef __cpp_exceptions
751 }
752 catch (...)
753 {
754 try
755 {
756 promise.set_exception(std::current_exception());
757 }
758 catch (...)
759 {
760 }
761 }
762#endif
763 };
764 }
765
766 std::future<R> future;
767 task_t task;
768}; // struct task_and_future
769
770#ifdef __cpp_exceptions
774struct [[nodiscard]] wait_deadlock : public std::runtime_error
775{
776 wait_deadlock() : std::runtime_error("BS::wait_deadlock") {};
777};
778#endif
779
780#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
781 #if defined(_WIN32)
785enum class os_process_priority
786{
787 idle = IDLE_PRIORITY_CLASS,
788 below_normal = BELOW_NORMAL_PRIORITY_CLASS,
789 normal = NORMAL_PRIORITY_CLASS,
790 above_normal = ABOVE_NORMAL_PRIORITY_CLASS,
791 high = HIGH_PRIORITY_CLASS,
792 realtime = REALTIME_PRIORITY_CLASS
793};
794
798enum class os_thread_priority
799{
800 idle = THREAD_PRIORITY_IDLE,
801 lowest = THREAD_PRIORITY_LOWEST,
802 below_normal = THREAD_PRIORITY_BELOW_NORMAL,
803 normal = THREAD_PRIORITY_NORMAL,
804 above_normal = THREAD_PRIORITY_ABOVE_NORMAL,
805 highest = THREAD_PRIORITY_HIGHEST,
806 realtime = THREAD_PRIORITY_TIME_CRITICAL
807};
808 #elif defined(__linux__) || defined(__APPLE__)
812enum class os_process_priority
813{
814 idle = PRIO_MAX - 2,
815 below_normal = PRIO_MAX / 2,
816 normal = 0,
817 above_normal = PRIO_MIN / 3,
818 high = PRIO_MIN * 2 / 3,
819 realtime = PRIO_MIN
820};
821
825enum class os_thread_priority
826{
827 idle,
828 lowest,
829 below_normal,
830 normal,
831 above_normal,
832 highest,
833 realtime
834};
835 #endif
836
842[[nodiscard]] inline std::optional<std::vector<bool>> get_os_process_affinity()
843{
844 #if defined(_WIN32)
845 DWORD_PTR process_mask = 0;
846 DWORD_PTR system_mask = 0;
847 if (GetProcessAffinityMask(GetCurrentProcess(), &process_mask, &system_mask) == 0)
848 return std::nullopt;
849 #ifdef __cpp_lib_int_pow2
850 const std::size_t num_cpus = static_cast<std::size_t>(std::bit_width(system_mask));
851 #else
852 std::size_t num_cpus = 0;
853 if (system_mask != 0)
854 {
855 num_cpus = 1;
856 while ((system_mask >>= 1U) != 0U)
857 ++num_cpus;
858 }
859 #endif
860 std::vector<bool> affinity(num_cpus);
861 for (std::size_t i = 0; i < num_cpus; ++i)
862 affinity[i] = ((process_mask & (1ULL << i)) != 0ULL);
863 return affinity;
864 #elif defined(__linux__)
865 cpu_set_t cpu_set;
866 CPU_ZERO(&cpu_set);
867 if (sched_getaffinity(getpid(), sizeof(cpu_set_t), &cpu_set) != 0)
868 return std::nullopt;
869 const int num_cpus = get_nprocs();
870 if (num_cpus < 1)
871 return std::nullopt;
872 std::vector<bool> affinity(static_cast<std::size_t>(num_cpus));
873 for (std::size_t i = 0; i < affinity.size(); ++i)
874 affinity[i] = CPU_ISSET(i, &cpu_set);
875 return affinity;
876 #elif defined(__APPLE__)
877 return std::nullopt;
878 #endif
879}
880
887inline bool set_os_process_affinity([[maybe_unused]] const std::vector<bool>& affinity)
888{
889 #if defined(_WIN32)
890 DWORD_PTR process_mask = 0;
891 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(), sizeof(DWORD_PTR) * 8); ++i)
892 process_mask |= (affinity[i] ? (1ULL << i) : 0ULL);
893 return SetProcessAffinityMask(GetCurrentProcess(), process_mask) != 0;
894 #elif defined(__linux__)
895 cpu_set_t cpu_set;
896 CPU_ZERO(&cpu_set);
897 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(), CPU_SETSIZE); ++i)
898 {
899 if (affinity[i])
900 CPU_SET(i, &cpu_set);
901 }
902 return sched_setaffinity(getpid(), sizeof(cpu_set_t), &cpu_set) == 0;
903 #elif defined(__APPLE__)
904 return false;
905 #endif
906}
907
913[[nodiscard]] inline std::optional<os_process_priority> get_os_process_priority()
914{
915 #if defined(_WIN32)
916 // On Windows, this is straightforward.
917 const DWORD priority = GetPriorityClass(GetCurrentProcess());
918 if (priority == 0)
919 return std::nullopt;
920 return static_cast<os_process_priority>(priority);
921 #elif defined(__linux__) || defined(__APPLE__)
922 // On Linux/macOS there is no direct analogue of `GetPriorityClass()` on Windows, so instead we get the "nice" value. The usual range is -20 to 19 or 20, with higher values corresponding to lower priorities. However, we are only using 6 pre-defined values for portability, so if the value was set via any means other than `BS::set_os_process_priority()`, it may not match one of our pre-defined values. Note that `getpriority()` returns -1 on error, but since this does not correspond to any of our pre-defined values, this function will return `std::nullopt` anyway.
923 const int nice_val = getpriority(PRIO_PROCESS, static_cast<id_t>(getpid()));
924 switch (nice_val)
925 {
926 case static_cast<int>(os_process_priority::idle):
927 return os_process_priority::idle;
928 case static_cast<int>(os_process_priority::below_normal):
929 return os_process_priority::below_normal;
930 case static_cast<int>(os_process_priority::normal):
931 return os_process_priority::normal;
932 case static_cast<int>(os_process_priority::above_normal):
933 return os_process_priority::above_normal;
934 case static_cast<int>(os_process_priority::high):
935 return os_process_priority::high;
936 case static_cast<int>(os_process_priority::realtime):
937 return os_process_priority::realtime;
938 default:
939 return std::nullopt;
940 }
941 #endif
942}
943
950inline bool set_os_process_priority(const os_process_priority priority)
951{
952 #if defined(_WIN32)
953 // On Windows, this is straightforward.
954 return SetPriorityClass(GetCurrentProcess(), static_cast<DWORD>(priority)) != 0;
955 #elif defined(__linux__) || defined(__APPLE__)
956 // On Linux/macOS there is no direct analogue of `SetPriorityClass()` on Windows, so instead we set the "nice" value. The usual range is -20 to 19 or 20, with higher values corresponding to lower priorities. However, we are only using 6 pre-defined values for portability. Note that the "nice" values are only relevant for the `SCHED_OTHER` policy, but we do not set that policy here, as it is per-thread rather than per-process.
957 // Also, it's important to note that a non-root user cannot decrease the nice value (i.e. increase the process priority), only increase it. This can cause confusing behavior. For example, if the current priority is `BS::os_process_priority::normal` and the user sets it to `BS::os_process_priority::idle`, they cannot change it back `BS::os_process_priority::normal`.
958 return setpriority(PRIO_PROCESS, static_cast<id_t>(getpid()), static_cast<int>(priority)) == 0;
959 #endif
960}
961#endif
962
966class [[nodiscard]] this_thread
967{
968 template <tp>
969 friend class thread_pool;
970
971public:
977 [[nodiscard]] static std::optional<std::size_t> get_index() noexcept
978 {
979 return my_index;
980 }
981
987 [[nodiscard]] static std::optional<void*> get_pool() noexcept
988 {
989 return my_pool;
990 }
991
992#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
998 [[nodiscard]] static std::optional<std::vector<bool>> get_os_thread_affinity()
999 {
1000 #if defined(_WIN32)
1001 // Windows does not have a `GetThreadAffinityMask()` function, but `SetThreadAffinityMask()` returns the previous affinity mask, so we can use that to get the current affinity and then restore it. It's a bit of a hack, but it works. Since the thread affinity must be a subset of the process affinity, we use the process affinity as the temporary value.
1002 DWORD_PTR process_mask = 0;
1003 DWORD_PTR system_mask = 0;
1004 if (GetProcessAffinityMask(GetCurrentProcess(), &process_mask, &system_mask) == 0)
1005 return std::nullopt;
1006 const DWORD_PTR previous_mask = SetThreadAffinityMask(GetCurrentThread(), process_mask);
1007 if (previous_mask == 0)
1008 return std::nullopt;
1009 SetThreadAffinityMask(GetCurrentThread(), previous_mask);
1010 #ifdef __cpp_lib_int_pow2
1011 const std::size_t num_cpus = static_cast<std::size_t>(std::bit_width(system_mask));
1012 #else
1013 std::size_t num_cpus = 0;
1014 if (system_mask != 0)
1015 {
1016 num_cpus = 1;
1017 while ((system_mask >>= 1U) != 0U)
1018 ++num_cpus;
1019 }
1020 #endif
1021 std::vector<bool> affinity(num_cpus);
1022 for (std::size_t i = 0; i < num_cpus; ++i)
1023 affinity[i] = ((previous_mask & (1ULL << i)) != 0ULL);
1024 return affinity;
1025 #elif defined(__linux__) && !defined(__ANDROID__)
1026 cpu_set_t cpu_set;
1027 CPU_ZERO(&cpu_set);
1028 if (pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_set) != 0)
1029 return std::nullopt;
1030 const int num_cpus = get_nprocs();
1031 if (num_cpus < 1)
1032 return std::nullopt;
1033 std::vector<bool> affinity(static_cast<std::size_t>(num_cpus));
1034 for (std::size_t i = 0; i < affinity.size(); ++i)
1035 affinity[i] = CPU_ISSET(i, &cpu_set);
1036 return affinity;
1037 #else
1038 return std::nullopt;
1039 #endif
1040 }
1041
1048 static bool set_os_thread_affinity([[maybe_unused]] const std::vector<bool>& affinity)
1049 {
1050 #if defined(_WIN32)
1051 DWORD_PTR thread_mask = 0;
1052 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(), sizeof(DWORD_PTR) * 8); ++i)
1053 thread_mask |= (affinity[i] ? (1ULL << i) : 0ULL);
1054 return SetThreadAffinityMask(GetCurrentThread(), thread_mask) != 0;
1055 #elif defined(__linux__) && !defined(__ANDROID__)
1056 cpu_set_t cpu_set;
1057 CPU_ZERO(&cpu_set);
1058 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(), CPU_SETSIZE); ++i)
1059 {
1060 if (affinity[i])
1061 CPU_SET(i, &cpu_set);
1062 }
1063 return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_set) == 0;
1064 #else
1065 return false;
1066 #endif
1067 }
1068
1074 [[nodiscard]] static std::optional<std::string> get_os_thread_name()
1075 {
1076 #if defined(_WIN32)
1077 // On Windows thread names are wide strings, so we need to convert them to normal strings.
1078 PWSTR data = nullptr;
1079 const HRESULT hr = GetThreadDescription(GetCurrentThread(), &data);
1080 if (FAILED(hr))
1081 return std::nullopt;
1082 if (data == nullptr)
1083 return std::nullopt;
1084 const int size = WideCharToMultiByte(CP_UTF8, 0, data, -1, nullptr, 0, nullptr, nullptr);
1085 if (size == 0)
1086 {
1087 LocalFree(data);
1088 return std::nullopt;
1089 }
1090 std::string name(static_cast<std::size_t>(size) - 1, 0);
1091 const int result = WideCharToMultiByte(CP_UTF8, 0, data, -1, name.data(), size, nullptr, nullptr);
1092 LocalFree(data);
1093 if (result == 0)
1094 return std::nullopt;
1095 return name;
1096 #elif defined(__linux__) || defined(__APPLE__)
1097 #ifdef __linux__
1098 // On Linux thread names are limited to 16 characters, including the null terminator.
1099 constexpr std::size_t buffer_size = 16;
1100 #else
1101 // On macOS thread names are limited to 64 characters, including the null terminator.
1102 constexpr std::size_t buffer_size = 64;
1103 #endif
1104 char name[buffer_size] = {};
1105 if (pthread_getname_np(pthread_self(), name, buffer_size) != 0)
1106 return std::nullopt;
1107 return std::string(name);
1108 #endif
1109 }
1110
1117 static bool set_os_thread_name(const std::string& name)
1118 {
1119 #if defined(_WIN32)
1120 // On Windows thread names are wide strings, so we need to convert them from normal strings.
1121 const int size = MultiByteToWideChar(CP_UTF8, 0, name.data(), -1, nullptr, 0);
1122 if (size == 0)
1123 return false;
1124 std::wstring wide(static_cast<std::size_t>(size), 0);
1125 if (MultiByteToWideChar(CP_UTF8, 0, name.data(), -1, wide.data(), size) == 0)
1126 return false;
1127 const HRESULT hr = SetThreadDescription(GetCurrentThread(), wide.data());
1128 return SUCCEEDED(hr);
1129 #elif defined(__linux__)
1130 // On Linux this is straightforward.
1131 return pthread_setname_np(pthread_self(), name.data()) == 0;
1132 #elif defined(__APPLE__)
1133 // On macOS, unlike Linux, a thread can only set a name for itself, so the signature is different.
1134 return pthread_setname_np(name.data()) == 0;
1135 #endif
1136 }
1137
1143 [[nodiscard]] static std::optional<os_thread_priority> get_os_thread_priority()
1144 {
1145 #if defined(_WIN32)
1146 // On Windows, this is straightforward.
1147 const int priority = GetThreadPriority(GetCurrentThread());
1148 if (priority == THREAD_PRIORITY_ERROR_RETURN)
1149 return std::nullopt;
1150 return static_cast<os_thread_priority>(priority);
1151 #elif defined(__linux__)
1152 // On Linux, we distill the choices of scheduling policy, priority, and "nice" value into 7 pre-defined levels, for simplicity and portability. The total number of possible combinations of policies and priorities is much larger, so if the value was set via any means other than `BS::this_thread::set_os_thread_priority()`, it may not match one of our pre-defined values.
1153 int policy = 0;
1154 struct sched_param param = {};
1155 if (pthread_getschedparam(pthread_self(), &policy, &param) != 0)
1156 return std::nullopt;
1157 if (policy == SCHED_FIFO && param.sched_priority == sched_get_priority_max(SCHED_FIFO))
1158 {
1159 // The only pre-defined priority that uses SCHED_FIFO and the maximum available priority value is the "realtime" priority.
1160 return os_thread_priority::realtime;
1161 }
1162 if (policy == SCHED_RR && param.sched_priority == sched_get_priority_min(SCHED_RR) + ((sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2))
1163 {
1164 // The only pre-defined priority that uses SCHED_RR and a priority in the middle of the available range is the "highest" priority.
1165 return os_thread_priority::highest;
1166 }
1167 #ifdef __linux__
1168 if (policy == SCHED_IDLE)
1169 {
1170 // The only pre-defined priority that uses SCHED_IDLE is the "idle" priority. Note that this scheduling policy is not available on macOS.
1171 return os_thread_priority::idle;
1172 }
1173 #endif
1174 if (policy == SCHED_OTHER)
1175 {
1176 // For SCHED_OTHER, the result depends on the "nice" value. The usual range is -20 to 19 or 20, with higher values corresponding to lower priorities. Note that `getpriority()` returns -1 on error, but since this does not correspond to any of our pre-defined values, this function will return `std::nullopt` anyway.
1177 const int nice_val = getpriority(PRIO_PROCESS, static_cast<id_t>(syscall(SYS_gettid)));
1178 switch (nice_val)
1179 {
1180 case PRIO_MIN + 2:
1181 return os_thread_priority::above_normal;
1182 case 0:
1183 return os_thread_priority::normal;
1184 case (PRIO_MAX / 2) + (PRIO_MAX % 2):
1185 return os_thread_priority::below_normal;
1186 case PRIO_MAX - 3:
1187 return os_thread_priority::lowest;
1188 #ifdef __APPLE__
1189 // `SCHED_IDLE` doesn't exist on macOS, so we use the policy `SCHED_OTHER` with a "nice" value of `PRIO_MAX - 2`.
1190 case PRIO_MAX - 2:
1191 return os_thread_priority::idle;
1192 #endif
1193 default:
1194 return std::nullopt;
1195 }
1196 }
1197 return std::nullopt;
1198 #elif defined(__APPLE__)
1199 // On macOS, we distill the choices of scheduling policy and priority into 7 pre-defined levels, for simplicity and portability. The total number of possible combinations of policies and priorities is much larger, so if the value was set via any means other than `BS::this_thread::set_os_thread_priority()`, it may not match one of our pre-defined values.
1200 int policy = 0;
1201 struct sched_param param = {};
1202 if (pthread_getschedparam(pthread_self(), &policy, &param) != 0)
1203 return std::nullopt;
1204 if (policy == SCHED_FIFO && param.sched_priority == sched_get_priority_max(SCHED_FIFO))
1205 {
1206 // The only pre-defined priority that uses SCHED_FIFO and the maximum available priority value is the "realtime" priority.
1207 return os_thread_priority::realtime;
1208 }
1209 if (policy == SCHED_RR && param.sched_priority == sched_get_priority_min(SCHED_RR) + (sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2)
1210 {
1211 // The only pre-defined priority that uses SCHED_RR and a priority in the middle of the available range is the "highest" priority.
1212 return os_thread_priority::highest;
1213 }
1214 if (policy == SCHED_OTHER)
1215 {
1216 // For SCHED_OTHER, the result depends on the specific value of the priority.
1217 if (param.sched_priority == sched_get_priority_max(SCHED_OTHER))
1218 return os_thread_priority::above_normal;
1219 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 2)
1220 return os_thread_priority::normal;
1221 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) * 2 / 3)
1222 return os_thread_priority::below_normal;
1223 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 3)
1224 return os_thread_priority::lowest;
1225 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER))
1226 return os_thread_priority::idle;
1227 return std::nullopt;
1228 }
1229 return std::nullopt;
1230 #endif
1231 }
1232
1239 static bool set_os_thread_priority(const os_thread_priority priority)
1240 {
1241 #if defined(_WIN32)
1242 // On Windows, this is straightforward.
1243 return SetThreadPriority(GetCurrentThread(), static_cast<int>(priority)) != 0;
1244 #elif defined(__linux__)
1245 // On Linux, we distill the choices of scheduling policy, priority, and "nice" value into 7 pre-defined levels, for simplicity and portability. The total number of possible combinations of policies and priorities is much larger, but allowing more fine-grained control would not be portable.
1246 int policy = 0;
1247 struct sched_param param = {};
1248 std::optional<int> nice_val = std::nullopt;
1249 switch (priority)
1250 {
1251 case os_thread_priority::realtime:
1252 // "Realtime" pre-defined priority: We use the policy `SCHED_FIFO` with the highest possible priority.
1253 policy = SCHED_FIFO;
1254 param.sched_priority = sched_get_priority_max(SCHED_FIFO);
1255 break;
1256 case os_thread_priority::highest:
1257 // "Highest" pre-defined priority: We use the policy `SCHED_RR` ("round-robin") with a priority in the middle of the available range.
1258 policy = SCHED_RR;
1259 param.sched_priority = sched_get_priority_min(SCHED_RR) + ((sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2);
1260 break;
1261 case os_thread_priority::above_normal:
1262 // "Above normal" pre-defined priority: We use the policy `SCHED_OTHER` (the default). This policy does not accept a priority value, so priority must be 0. However, we set the "nice" value to the minimum value as given by `PRIO_MIN`, plus 2 (which should evaluate to -18). The usual range is -20 to 19 or 20, with higher values corresponding to lower priorities.
1263 policy = SCHED_OTHER;
1264 param.sched_priority = 0;
1265 nice_val = PRIO_MIN + 2;
1266 break;
1267 case os_thread_priority::normal:
1268 // "Normal" pre-defined priority: We use the policy `SCHED_OTHER`, priority must be 0, and we set the "nice" value to 0 (the default).
1269 policy = SCHED_OTHER;
1270 param.sched_priority = 0;
1271 nice_val = 0;
1272 break;
1273 case os_thread_priority::below_normal:
1274 // "Below normal" pre-defined priority: We use the policy `SCHED_OTHER`, priority must be 0, and we set the "nice" value to half the maximum value as given by `PRIO_MAX`, rounded up (which should evaluate to 10).
1275 policy = SCHED_OTHER;
1276 param.sched_priority = 0;
1277 nice_val = (PRIO_MAX / 2) + (PRIO_MAX % 2);
1278 break;
1279 case os_thread_priority::lowest:
1280 // "Lowest" pre-defined priority: We use the policy `SCHED_OTHER`, priority must be 0, and we set the "nice" value to the maximum value as given by `PRIO_MAX`, minus 3 (which should evaluate to 17).
1281 policy = SCHED_OTHER;
1282 param.sched_priority = 0;
1283 nice_val = PRIO_MAX - 3;
1284 break;
1285 case os_thread_priority::idle:
1286 // "Idle" pre-defined priority on Linux: We use the policy `SCHED_IDLE`, priority must be 0, and we don't touch the "nice" value.
1287 policy = SCHED_IDLE;
1288 param.sched_priority = 0;
1289 break;
1290 default:
1291 return false;
1292 }
1293 bool success = (pthread_setschedparam(pthread_self(), policy, &param) == 0);
1294 if (nice_val.has_value())
1295 success = success && (setpriority(PRIO_PROCESS, static_cast<id_t>(syscall(SYS_gettid)), nice_val.value()) == 0);
1296 return success;
1297 #elif defined(__APPLE__)
1298 // On macOS, unlike Linux, the "nice" value is per-process, not per-thread (in compliance with the POSIX standard). However, unlike Linux, `SCHED_OTHER` on macOS does have a range of priorities. So for `realtime` and `highest` priorities we use `SCHED_FIFO` and `SCHED_RR` respectively as for Linux, but for the other priorities we use `SCHED_OTHER` with a priority in the range given by `sched_get_priority_min(SCHED_OTHER)` to `sched_get_priority_max(SCHED_OTHER)`.
1299 int policy = 0;
1300 struct sched_param param = {};
1301 switch (priority)
1302 {
1303 case os_thread_priority::realtime:
1304 // "Realtime" pre-defined priority: We use the policy `SCHED_FIFO` with the highest possible priority.
1305 policy = SCHED_FIFO;
1306 param.sched_priority = sched_get_priority_max(SCHED_FIFO);
1307 break;
1308 case os_thread_priority::highest:
1309 // "Highest" pre-defined priority: We use the policy `SCHED_RR` ("round-robin") with a priority in the middle of the available range.
1310 policy = SCHED_RR;
1311 param.sched_priority = sched_get_priority_min(SCHED_RR) + (sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2;
1312 break;
1313 case os_thread_priority::above_normal:
1314 // "Above normal" pre-defined priority: We use the policy `SCHED_OTHER` (the default) with the highest possible priority.
1315 policy = SCHED_OTHER;
1316 param.sched_priority = sched_get_priority_max(SCHED_OTHER);
1317 break;
1318 case os_thread_priority::normal:
1319 // "Normal" pre-defined priority: We use the policy `SCHED_OTHER` (the default) with a priority in the middle of the available range (which appears to be the default?).
1320 policy = SCHED_OTHER;
1321 param.sched_priority = sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 2;
1322 break;
1323 case os_thread_priority::below_normal:
1324 // "Below normal" pre-defined priority: We use the policy `SCHED_OTHER` (the default) with a priority equal to 2/3rds of the normal value.
1325 policy = SCHED_OTHER;
1326 param.sched_priority = sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) * 2 / 3;
1327 break;
1328 case os_thread_priority::lowest:
1329 // "Lowest" pre-defined priority: We use the policy `SCHED_OTHER` (the default) with a priority equal to 1/3rd of the normal value.
1330 policy = SCHED_OTHER;
1331 param.sched_priority = sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 3;
1332 break;
1333 case os_thread_priority::idle:
1334 // "Idle" pre-defined priority on macOS: We use the policy `SCHED_OTHER` (the default) with the lowest possible priority.
1335 policy = SCHED_OTHER;
1336 param.sched_priority = sched_get_priority_min(SCHED_OTHER);
1337 break;
1338 default:
1339 return false;
1340 }
1341 return pthread_setschedparam(pthread_self(), policy, &param) == 0;
1342 #endif
1343 }
1344#endif
1345
1346private:
1347 inline static thread_local std::optional<std::size_t> my_index = std::nullopt;
1348 inline static thread_local std::optional<void*> my_pool = std::nullopt;
1349}; // class this_thread
1350
1358template <typename T1, typename T2, typename Enable = void>
1360{
1361 // Fallback to `std::common_type_t` if no specialization matches.
1362 using type = std::common_type_t<T1, T2>;
1363};
1364
1365// The common type of two signed integers is the larger of the integers, with the same signedness.
1366template <typename T1, typename T2>
1367struct common_index_type<T1, T2, std::enable_if_t<std::is_signed_v<T1> && std::is_signed_v<T2>>>
1368{
1369 using type = std::conditional_t<(sizeof(T1) >= sizeof(T2)), T1, T2>;
1370};
1371
1372// The common type of two unsigned integers is the larger of the integers, with the same signedness.
1373template <typename T1, typename T2>
1374struct common_index_type<T1, T2, std::enable_if_t<std::is_unsigned_v<T1> && std::is_unsigned_v<T2>>>
1375{
1376 using type = std::conditional_t<(sizeof(T1) >= sizeof(T2)), T1, T2>;
1377};
1378
1379// The common type of a signed and an unsigned integer is a signed integer that can hold the full ranges of both integers.
1380template <typename T1, typename T2>
1381struct common_index_type<T1, T2, std::enable_if_t<(std::is_signed_v<T1> && std::is_unsigned_v<T2>) || (std::is_unsigned_v<T1> && std::is_signed_v<T2>)>>
1382{
1383 using S = std::conditional_t<std::is_signed_v<T1>, T1, T2>;
1384 using U = std::conditional_t<std::is_unsigned_v<T1>, T1, T2>;
1385 static constexpr std::size_t larger_size = (sizeof(S) > sizeof(U)) ? sizeof(S) : sizeof(U);
1386 using type = std::conditional_t<larger_size <= 4,
1387 // If both integers are 32 bits or less, the common type should be a signed type that can hold both of them. If both are 8 bits, or the signed type is 16 bits and the unsigned type is 8 bits, the common type is `std::int16_t`. Otherwise, if both are 16 bits, or the signed type is 32 bits and the unsigned type is smaller, the common type is `std::int32_t`. Otherwise, if both are 32 bits or less, the common type is `std::int64_t`.
1388 std::conditional_t<larger_size == 1 || (sizeof(S) == 2 && sizeof(U) == 1), std::int16_t, std::conditional_t<larger_size == 2 || (sizeof(S) == 4 && sizeof(U) < 4), std::int32_t, std::int64_t>>,
1389 // If the unsigned integer is 64 bits, the common type should also be an unsigned 64-bit integer, that is, `std::uint64_t`. The reason is that the most common scenario where this might happen is where the indices go from 0 to `x` where `x` has been previously defined as `std::size_t`, e.g. the size of a vector. Note that this will fail if the first index is negative; in that case, the user must cast the indices explicitly to the desired common type. If the unsigned integer is not 64 bits, then the signed integer must be 64 bits, hence the common type is `std::int64_t`.
1390 std::conditional_t<sizeof(U) == 8, std::uint64_t, std::int64_t>>;
1391};
1392
1399template <typename T1, typename T2>
1400using common_index_type_t = typename common_index_type<T1, T2>::type;
1401
1406
1411
1416
1421
1427template <tp OptFlags = tp::none>
1428class [[nodiscard]] thread_pool
1429{
1430public:
1434 static constexpr bool priority_enabled = (OptFlags & tp::priority) != tp::none;
1435
1439 static constexpr bool pause_enabled = (OptFlags & tp::pause) != tp::none;
1440
1444 static constexpr bool wait_deadlock_checks_enabled = (OptFlags & tp::wait_deadlock_checks) != tp::none;
1445
1446#ifndef __cpp_exceptions
1447 static_assert(!wait_deadlock_checks_enabled, "Wait deadlock checks cannot be enabled if exception handling is disabled.");
1448#endif
1449
1450 // ============================
1451 // Constructors and destructors
1452 // ============================
1453
1457 thread_pool() : thread_pool(0, [] {}) {}
1458
1464 explicit thread_pool(const std::size_t num_threads) : thread_pool(num_threads, [] {}) {}
1465
1471 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1472 explicit thread_pool(F&& init) : thread_pool(0, std::forward<F>(init)) {}
1473
1480 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1481 thread_pool(const std::size_t num_threads, F&& init)
1482 {
1483 create_threads(num_threads, std::forward<F>(init));
1484 }
1485
1486 // The copy and move constructors and assignment operators are deleted. The thread pool cannot be copied or moved.
1487 thread_pool(const thread_pool&) = delete;
1488 thread_pool(thread_pool&&) = delete;
1489 thread_pool& operator=(const thread_pool&) = delete;
1490 thread_pool& operator=(thread_pool&&) = delete;
1491
1495 ~thread_pool() noexcept
1496 {
1497#ifdef __cpp_exceptions
1498 try
1499 {
1500#endif
1501 wait();
1502#ifndef __cpp_lib_jthread
1503 destroy_threads();
1504#endif
1505#ifdef __cpp_exceptions
1506 }
1507 catch (...)
1508 {
1509 }
1510#endif
1511 }
1512
1513 // =======================
1514 // Public member functions
1515 // =======================
1516
1530 template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>, typename F>
1531 void detach_blocks(const T1 first_index, const T2 index_after_last, F&& block, const std::size_t num_blocks = 0, const priority_t priority = 0)
1532 {
1533 enqueue_blocks<T, F, void, false>(static_cast<T>(first_index), static_cast<T>(index_after_last), std::forward<F>(block), num_blocks, priority);
1534 }
1535
1544 template <typename I>
1545 void detach_bulk(const I first, const I last, const priority_t priority = 0)
1546 {
1547 if (first != last)
1548 {
1549 bool notify = false;
1550 {
1551 const std::scoped_lock tasks_lock(tasks_mutex);
1552 if constexpr (pause_enabled)
1553 notify = tasks.empty() && !paused;
1554 else
1555 notify = tasks.empty();
1556 for (I it = first; it != last; ++it)
1557 {
1558 if constexpr (priority_enabled)
1559 tasks.emplace(std::move(*it), priority);
1560 else
1561 tasks.emplace(std::move(*it));
1562 }
1563 }
1564 if (notify)
1565 task_available_cv.notify_all();
1566 }
1567 }
1568
1576 template <typename C>
1577 void detach_bulk(C& container, const priority_t priority = 0)
1578 {
1579 detach_bulk(std::begin(container), std::end(container), priority);
1580 }
1581
1595 template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>, typename F>
1596 void detach_loop(const T1 first_index, const T2 index_after_last, F&& loop, const std::size_t num_blocks = 0, const priority_t priority = 0)
1597 {
1598 enqueue_loop<T, F, false>(static_cast<T>(first_index), static_cast<T>(index_after_last), std::forward<F>(loop), num_blocks, priority);
1599 }
1600
1613 template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>, typename F>
1614 void detach_sequence(const T1 first_index, const T2 index_after_last, F&& sequence, const priority_t priority = 0)
1615 {
1616 return enqueue_sequence<T, F, void, false>(static_cast<T>(first_index), static_cast<T>(index_after_last), std::forward<F>(sequence), priority);
1617 }
1618
1626 template <typename F>
1627 void detach_task(F&& task, const priority_t priority = 0)
1628 {
1629 {
1630 const std::scoped_lock tasks_lock(tasks_mutex);
1631 if constexpr (priority_enabled)
1632 tasks.emplace(std::forward<F>(task), priority);
1633 else
1634 tasks.emplace(std::forward<F>(task));
1635 }
1636 task_available_cv.notify_one();
1637 }
1638
1639#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
1645 [[nodiscard]] std::vector<thread_t::native_handle_type> get_native_handles() const
1646 {
1647 std::vector<thread_t::native_handle_type> native_handles(thread_count);
1648 for (std::size_t i = 0; i < thread_count; ++i)
1649 native_handles[i] = threads[i].native_handle();
1650 return native_handles;
1651 }
1652#endif
1653
1659 [[nodiscard]] std::size_t get_tasks_queued() const
1660 {
1661 const std::scoped_lock tasks_lock(tasks_mutex);
1662 return tasks.size();
1663 }
1664
1670 [[nodiscard]] std::size_t get_tasks_running() const
1671 {
1672 const std::scoped_lock tasks_lock(tasks_mutex);
1673 return tasks_running;
1674 }
1675
1681 [[nodiscard]] std::size_t get_tasks_total() const
1682 {
1683 const std::scoped_lock tasks_lock(tasks_mutex);
1684 return tasks_running + tasks.size();
1685 }
1686
1692 [[nodiscard]] std::size_t get_thread_count() const noexcept
1693 {
1694 return thread_count;
1695 }
1696
1702 [[nodiscard]] std::vector<thread_t::id> get_thread_ids() const
1703 {
1704 std::vector<thread_t::id> thread_ids(thread_count);
1705 for (std::size_t i = 0; i < thread_count; ++i)
1706 thread_ids[i] = threads[i].get_id();
1707 return thread_ids;
1708 }
1709
1715 BS_THREAD_POOL_IF_PAUSE_ENABLED
1716 [[nodiscard]] bool is_paused() const
1717 {
1718 const std::scoped_lock tasks_lock(tasks_mutex);
1719 return paused;
1720 }
1721
1725 BS_THREAD_POOL_IF_PAUSE_ENABLED
1726 void pause()
1727 {
1728 const std::scoped_lock tasks_lock(tasks_mutex);
1729 paused = true;
1730 }
1731
1735 void purge()
1736 {
1737 const std::scoped_lock tasks_lock(tasks_mutex);
1738 tasks = {};
1739 }
1740
1744 void reset()
1745 {
1746 reset(0, [](std::size_t) {});
1747 }
1748
1754 void reset(const std::size_t num_threads)
1755 {
1756 reset(num_threads, [](std::size_t) {});
1757 }
1758
1764 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1765 void reset(F&& init)
1766 {
1767 reset(0, std::forward<F>(init));
1768 }
1769
1776 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1777 void reset(const std::size_t num_threads, F&& init)
1778 {
1779 if constexpr (pause_enabled)
1780 {
1781 std::unique_lock tasks_lock(tasks_mutex);
1782 const bool was_paused = paused;
1783 paused = true;
1784 tasks_lock.unlock();
1785 reset_pool(num_threads, std::forward<F>(init));
1786 tasks_lock.lock();
1787 paused = was_paused;
1788 tasks_lock.unlock();
1789 if (!was_paused)
1790 task_available_cv.notify_all();
1791 }
1792 else
1793 {
1794 reset_pool(num_threads, std::forward<F>(init));
1795 }
1796 }
1797
1803 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1804 void set_cleanup_func(F&& cleanup)
1805 {
1806 if constexpr (std::is_invocable_v<F, std::size_t>)
1807 {
1808 cleanup_func = std::forward<F>(cleanup);
1809 }
1810 else
1811 {
1812 cleanup_func = [cleanup = std::forward<F>(cleanup)](std::size_t)
1813 {
1814 cleanup();
1815 };
1816 }
1817 }
1818
1834 template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>, typename F, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
1835 [[nodiscard]] multi_future<R> submit_blocks(const T1 first_index, const T2 index_after_last, F&& block, const std::size_t num_blocks = 0, const priority_t priority = 0)
1836 {
1837 return enqueue_blocks<T, F, R, true>(static_cast<T>(first_index), static_cast<T>(index_after_last), std::forward<F>(block), num_blocks, priority);
1838 }
1839
1851 template <typename I, typename F = decltype(*std::declval<I>()), typename R = std::invoke_result_t<std::decay_t<F>>>
1852 [[nodiscard]] multi_future<R> submit_bulk(const I first, const I last, const priority_t priority = 0)
1853 {
1854 if (first != last)
1855 {
1856 const std::size_t num_tasks = static_cast<std::size_t>(std::distance(first, last));
1857 multi_future<R> all_futures;
1858 all_futures.reserve(num_tasks);
1859 std::vector<task_t> all_tasks;
1860 all_tasks.reserve(num_tasks);
1861 for (I it = first; it != last; ++it)
1862 {
1863 task_and_future<R> ft(std::move(*it));
1864 all_futures.emplace_back(std::move(ft.future));
1865 all_tasks.emplace_back(std::move(ft.task));
1866 }
1867 detach_bulk(all_tasks, priority);
1868 return all_futures;
1869 }
1870 return {};
1871 }
1872
1883 template <typename C, typename F = decltype(*std::declval<C&>().begin()), typename R = std::invoke_result_t<std::decay_t<F>>>
1884 [[nodiscard]] multi_future<R> submit_bulk(C& container, const priority_t priority = 0)
1885 {
1886 return submit_bulk(std::begin(container), std::end(container), priority);
1887 }
1888
1903 template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>, typename F>
1904 [[nodiscard]] multi_future<void> submit_loop(const T1 first_index, const T2 index_after_last, F&& loop, const std::size_t num_blocks = 0, const priority_t priority = 0)
1905 {
1906 return enqueue_loop<T, F, true>(static_cast<T>(first_index), static_cast<T>(index_after_last), std::forward<F>(loop), num_blocks, priority);
1907 }
1908
1923 template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>, typename F, typename R = std::invoke_result_t<std::decay_t<F>, T>>
1924 [[nodiscard]] multi_future<R> submit_sequence(const T1 first_index, const T2 index_after_last, F&& sequence, const priority_t priority = 0)
1925 {
1926 return enqueue_sequence<T, F, R, true>(static_cast<T>(first_index), static_cast<T>(index_after_last), std::forward<F>(sequence), priority);
1927 }
1928
1938 template <typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
1939 [[nodiscard]] std::future<R> submit_task(F&& task, const priority_t priority = 0)
1940 {
1941 task_and_future<R> ft(std::forward<F>(task));
1942 detach_task(std::move(ft.task), priority);
1943 return std::move(ft.future);
1944 }
1945
1949 BS_THREAD_POOL_IF_PAUSE_ENABLED
1950 void unpause()
1951 {
1952 {
1953 const std::scoped_lock tasks_lock(tasks_mutex);
1954 paused = false;
1955 }
1956 task_available_cv.notify_all();
1957 }
1958
1964 void wait()
1965 {
1966#ifdef __cpp_exceptions
1967 if constexpr (wait_deadlock_checks_enabled)
1968 {
1969 if (this_thread::get_pool() == this)
1970 throw wait_deadlock();
1971 }
1972#endif
1973 std::unique_lock tasks_lock(tasks_mutex);
1974 waiting = true;
1975 tasks_done_cv.wait(tasks_lock,
1976 [this]
1977 {
1978 if constexpr (pause_enabled)
1979 return (tasks_running == 0) && (paused || tasks.empty());
1980 else
1981 return (tasks_running == 0) && tasks.empty();
1982 });
1983 waiting = false;
1984 }
1985
1995 template <typename R, typename P>
1996 bool wait_for(const std::chrono::duration<R, P>& duration)
1997 {
1998#ifdef __cpp_exceptions
1999 if constexpr (wait_deadlock_checks_enabled)
2000 {
2001 if (this_thread::get_pool() == this)
2002 throw wait_deadlock();
2003 }
2004#endif
2005 std::unique_lock tasks_lock(tasks_mutex);
2006 waiting = true;
2007 const bool status = tasks_done_cv.wait_for(tasks_lock, duration,
2008 [this]
2009 {
2010 if constexpr (pause_enabled)
2011 return (tasks_running == 0) && (paused || tasks.empty());
2012 else
2013 return (tasks_running == 0) && tasks.empty();
2014 });
2015 waiting = false;
2016 return status;
2017 }
2018
2028 template <typename C, typename D>
2029 bool wait_until(const std::chrono::time_point<C, D>& timeout_time)
2030 {
2031#ifdef __cpp_exceptions
2032 if constexpr (wait_deadlock_checks_enabled)
2033 {
2034 if (this_thread::get_pool() == this)
2035 throw wait_deadlock();
2036 }
2037#endif
2038 std::unique_lock tasks_lock(tasks_mutex);
2039 waiting = true;
2040 const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time,
2041 [this]
2042 {
2043 if constexpr (pause_enabled)
2044 return (tasks_running == 0) && (paused || tasks.empty());
2045 else
2046 return (tasks_running == 0) && tasks.empty();
2047 });
2048 waiting = false;
2049 return status;
2050 }
2051
2052private:
2053 // ========================
2054 // Private member functions
2055 // ========================
2056
2063 template <typename F>
2064 void create_threads(const std::size_t num_threads, F&& init)
2065 {
2066 if constexpr (std::is_invocable_v<F, std::size_t>)
2067 {
2068 init_func = std::forward<F>(init);
2069 }
2070 else
2071 {
2072 init_func = [init = std::forward<F>(init)](std::size_t)
2073 {
2074 init();
2075 };
2076 }
2077 thread_count = determine_thread_count(num_threads);
2078 threads = std::make_unique<thread_t[]>(thread_count);
2079 {
2080 const std::scoped_lock tasks_lock(tasks_mutex);
2081 tasks_running = thread_count;
2082#ifndef __cpp_lib_jthread
2083 workers_running = true;
2084#endif
2085 }
2086 for (std::size_t i = 0; i < thread_count; ++i)
2087 {
2088 threads[i] = thread_t(
2089 [this, i]
2090#ifdef __cpp_lib_jthread
2091 (const std::stop_token& stop_token)
2092 {
2093 worker(stop_token, i);
2094 }
2095#else
2096 {
2097 worker(i);
2098 }
2099#endif
2100 );
2101 }
2102 }
2103
2104#ifndef __cpp_lib_jthread
2109 {
2110 {
2111 const std::scoped_lock tasks_lock(tasks_mutex);
2112 workers_running = false;
2113 }
2114 task_available_cv.notify_all();
2115 for (std::size_t i = 0; i < thread_count; ++i)
2116 threads[i].join();
2117 }
2118#endif
2119
2125 [[nodiscard]] static std::size_t determine_thread_count(const std::size_t num_threads) noexcept(!thread_pool_native_extensions)
2126 {
2127 if (num_threads > 0)
2128 return num_threads;
2129#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
2130 const std::optional<std::vector<bool>> affinity = BS::get_os_process_affinity();
2131 if (affinity.has_value())
2132 {
2133 const std::size_t affinity_thread_count = static_cast<std::size_t>(std::count(affinity->begin(), affinity->end(), true));
2134 return (affinity_thread_count > 0) ? affinity_thread_count : 1;
2135 }
2136#endif
2137 if (thread_t::hardware_concurrency() > 0)
2138 return thread_t::hardware_concurrency();
2139 return 1;
2140 }
2141
2157 template <typename T, typename F, typename R, bool submit, typename N = std::conditional_t<submit, multi_future<R>, void>>
2158 [[nodiscard]] N enqueue_blocks(const T first_index, const T index_after_last, F&& block, std::size_t num_blocks, const priority_t priority = 0)
2159 {
2160 if (index_after_last > first_index)
2161 {
2162 using block_task_t = block_task<T, F, R>;
2163 const std::shared_ptr<std::decay_t<F>> block_ptr = std::make_shared<std::decay_t<F>>(std::forward<F>(block));
2164 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
2165 num_blocks = blks.get_num_blocks();
2166 std::vector<std::conditional_t<submit, block_task_t, task_t>> all_tasks;
2167 all_tasks.reserve(num_blocks);
2168 for (std::size_t i = 0; i < num_blocks; ++i)
2169 all_tasks.emplace_back(block_task_t{block_ptr, blks.start(i), blks.end(i)});
2170 if constexpr (submit)
2171 return submit_bulk(all_tasks, priority);
2172 else
2173 detach_bulk(all_tasks, priority);
2174 }
2175 return N();
2176 }
2177
2192 template <typename T, typename F, bool submit, typename N = std::conditional_t<submit, multi_future<void>, void>>
2193 [[nodiscard]] N enqueue_loop(const T first_index, const T index_after_last, F&& loop, std::size_t num_blocks, const priority_t priority = 0)
2194 {
2195 if (index_after_last > first_index)
2196 {
2197 using loop_task_t = loop_task<T, F>;
2198 const std::shared_ptr<std::decay_t<F>> loop_ptr = std::make_shared<std::decay_t<F>>(std::forward<F>(loop));
2199 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
2200 num_blocks = blks.get_num_blocks();
2201 std::vector<std::conditional_t<submit, loop_task_t, task_t>> all_tasks;
2202 all_tasks.reserve(num_blocks);
2203 for (std::size_t i = 0; i < num_blocks; ++i)
2204 all_tasks.emplace_back(loop_task_t{loop_ptr, blks.start(i), blks.end(i)});
2205 if constexpr (submit)
2206 return submit_bulk(all_tasks, priority);
2207 else
2208 detach_bulk(all_tasks, priority);
2209 }
2210 return N();
2211 }
2212
2227 template <typename T, typename F, typename R, bool submit, typename N = std::conditional_t<submit, multi_future<R>, void>>
2228 [[nodiscard]] N enqueue_sequence(const T first_index, const T index_after_last, F&& sequence, const priority_t priority = 0)
2229 {
2230 if (index_after_last > first_index)
2231 {
2232 using sequence_task_t = sequence_task<T, F, R>;
2233 const std::shared_ptr<std::decay_t<F>> sequence_ptr = std::make_shared<std::decay_t<F>>(std::forward<F>(sequence));
2234 std::vector<std::conditional_t<submit, sequence_task_t, task_t>> all_tasks;
2235 all_tasks.reserve(static_cast<std::size_t>(index_after_last - first_index));
2236 for (T i = first_index; i < index_after_last; ++i)
2237 all_tasks.emplace_back(sequence_task_t{sequence_ptr, i});
2238 if constexpr (submit)
2239 return submit_bulk(all_tasks, priority);
2240 else
2241 detach_bulk(all_tasks, priority);
2242 }
2243 return N();
2244 }
2245
2251 [[nodiscard]] task_t pop_task()
2252 {
2253 task_t task;
2254 if constexpr (priority_enabled)
2255 task = std::move(tasks.top().task);
2256 else
2257 task = std::move(tasks.front());
2258 tasks.pop();
2259 return task;
2260 }
2261
2268 template <typename F>
2269 void reset_pool(const std::size_t num_threads, F&& init)
2270 {
2271 wait();
2272#ifndef __cpp_lib_jthread
2273 destroy_threads();
2274#endif
2275 create_threads(num_threads, std::forward<F>(init));
2276 }
2277
2283 void worker(BS_THREAD_POOL_WORKER_TOKEN const std::size_t idx)
2284 {
2285 this_thread::my_pool = this;
2286 this_thread::my_index = idx;
2287 init_func(idx);
2288 while (true)
2289 {
2290 std::unique_lock tasks_lock(tasks_mutex);
2291 --tasks_running;
2292 if constexpr (pause_enabled)
2293 {
2294 if (waiting && (tasks_running == 0) && (paused || tasks.empty()))
2295 tasks_done_cv.notify_all();
2296 }
2297 else
2298 {
2299 if (waiting && (tasks_running == 0) && tasks.empty())
2300 tasks_done_cv.notify_all();
2301 }
2302 task_available_cv.wait(tasks_lock BS_THREAD_POOL_WAIT_TOKEN,
2303 [this]
2304 {
2305 if constexpr (pause_enabled)
2306 return !(paused || tasks.empty()) BS_THREAD_POOL_OR_STOP_CONDITION;
2307 else
2308 return !tasks.empty() BS_THREAD_POOL_OR_STOP_CONDITION;
2309 });
2310 if (BS_THREAD_POOL_STOP_CONDITION)
2311 break;
2312 {
2313 task_t task = pop_task(); // NOLINT(misc-const-correctness) In C++23 this cannot be const since `std::move_only_function::operator()` is not a const member function.
2314 ++tasks_running;
2315 tasks_lock.unlock();
2316#ifdef __cpp_exceptions
2317 try
2318 {
2319#endif
2320 task();
2321#ifdef __cpp_exceptions
2322 }
2323 catch (...)
2324 {
2325 }
2326#endif
2327 }
2328 }
2329 cleanup_func(idx);
2330 this_thread::my_index = std::nullopt;
2331 this_thread::my_pool = std::nullopt;
2332 }
2333
2334 // ============
2335 // Private data
2336 // ============
2337
2341 mutable std::mutex tasks_mutex;
2342
2346#ifdef __cpp_lib_jthread
2347 std::condition_variable_any
2348#else
2349 std::condition_variable
2350#endif
2352
2356 std::condition_variable tasks_done_cv;
2357
2361 move_only_function<void(std::size_t)> cleanup_func = [](std::size_t) {};
2362
2366 move_only_function<void(std::size_t)> init_func = [](std::size_t) {};
2367
2371 std::conditional_t<priority_enabled, std::priority_queue<pr_task>, std::queue<task_t>> tasks;
2372
2376 std::size_t tasks_running = 0;
2377
2381 std::size_t thread_count = 0;
2382
2386 std::unique_ptr<thread_t[]> threads = nullptr;
2387
2391 std::conditional_t<pause_enabled, bool, std::monostate> paused = {};
2392
2396 bool waiting = false;
2397
2398#ifndef __cpp_lib_jthread
2402 bool workers_running = false;
2403#endif
2404}; // class thread_pool
2405
2409class [[nodiscard]] synced_stream
2410{
2411public:
2415 explicit synced_stream()
2416 {
2417 add_stream(std::cout);
2418 }
2419
2426 template <typename... T>
2427 explicit synced_stream(T&... streams)
2428 {
2429 (add_stream(streams), ...);
2430 }
2431
2437 void add_stream(std::ostream& stream)
2438 {
2439 out_streams.push_back(&stream);
2440 }
2441
2447 std::vector<std::ostream*>& get_streams() noexcept
2448 {
2449 return out_streams;
2450 }
2451
2458 template <typename... T>
2459 void print(const T&... items)
2460 {
2461 const std::scoped_lock stream_lock(stream_mutex);
2462 for (std::ostream* const stream : out_streams)
2463 (*stream << ... << items);
2464 }
2465
2472 template <typename... T>
2473 void println(T&&... items)
2474 {
2475 print(std::forward<T>(items)..., '\n');
2476 }
2477
2483 void remove_stream(std::ostream& stream)
2484 {
2485 out_streams.erase(std::remove(out_streams.begin(), out_streams.end(), &stream), out_streams.end());
2486 }
2487
2491 inline static std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
2492
2496 inline static std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);
2497
2498private:
2502 mutable std::mutex stream_mutex;
2503
2507 std::vector<std::ostream*> out_streams;
2508}; // class synced_stream
2509} // namespace BS
2510#endif // BS_THREAD_POOL_HPP
A helper class to divide a range into blocks. Used by detach_blocks(), submit_blocks(),...
Definition BS_thread_pool.hpp:575
T start(const std::size_t block) const noexcept
Get the first index of a block.
Definition BS_thread_pool.hpp:631
T end(const std::size_t block) const noexcept
Get the index after the last index of a block.
Definition BS_thread_pool.hpp:610
std::size_t get_num_blocks() const noexcept
Get the number of blocks. Note that this may be different than the desired number of blocks that was ...
Definition BS_thread_pool.hpp:620
blocks(const T first_index_, const T index_after_last_, const std::size_t num_blocks_) noexcept
Construct a blocks object with the given specifications.
Definition BS_thread_pool.hpp:584
Definition BS_thread_pool.hpp:303
A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
Definition BS_thread_pool.hpp:460
void wait() const
Wait for all the futures stored in this BS::multi_future.
Definition BS_thread_pool.hpp:520
bool valid() const noexcept
Check if all the futures stored in this BS::multi_future are valid.
Definition BS_thread_pool.hpp:509
bool wait_for(const std::chrono::duration< R, P > &duration) const
Wait for all the futures stored in this BS::multi_future, but stop waiting after the specified durati...
Definition BS_thread_pool.hpp:535
std::size_t ready_count() const
Check how many of the futures stored in this BS::multi_future are ready.
Definition BS_thread_pool.hpp:493
std::conditional_t< std::is_void_v< T >, void, std::vector< T > > get()
Get the results from all the futures stored in this BS::multi_future, rethrowing any stored exception...
Definition BS_thread_pool.hpp:470
bool wait_until(const std::chrono::time_point< C, D > &timeout_time) const
Wait for all the futures stored in this BS::multi_future, but stop waiting after the specified time p...
Definition BS_thread_pool.hpp:556
A utility class to synchronize printing to one or more output streams by different threads.
Definition BS_thread_pool.hpp:2410
std::vector< std::ostream * > out_streams
The output streams to print to.
Definition BS_thread_pool.hpp:2507
std::mutex stream_mutex
A mutex to synchronize printing.
Definition BS_thread_pool.hpp:2502
static std::ostream &(&) endl(std::ostream &)
A stream manipulator to pass to a BS::synced_stream (an explicit cast of std::endl)....
Definition BS_thread_pool.hpp:2491
static std::ostream &(&) flush(std::ostream &)
A stream manipulator to pass to a BS::synced_stream (an explicit cast of std::flush)....
Definition BS_thread_pool.hpp:2496
synced_stream()
Construct a new synced stream which prints to std::cout.
Definition BS_thread_pool.hpp:2415
void add_stream(std::ostream &stream)
Add a stream to the list of output streams to print to.
Definition BS_thread_pool.hpp:2437
void println(T &&... items)
Print any number of items into each output stream, followed by a newline character....
Definition BS_thread_pool.hpp:2473
void remove_stream(std::ostream &stream)
Remove a stream from the list of output streams to print to.
Definition BS_thread_pool.hpp:2483
void print(const T &... items)
Print any number of items into each output stream. Ensures that no other threads print to the streams...
Definition BS_thread_pool.hpp:2459
std::vector< std::ostream * > & get_streams() noexcept
Get a reference to a vector containing pointers to the output streams to print to.
Definition BS_thread_pool.hpp:2447
synced_stream(T &... streams)
Construct a new synced stream which prints to the given output stream(s).
Definition BS_thread_pool.hpp:2427
A class used to obtain information about the current thread and, if native extensions are enabled,...
Definition BS_thread_pool.hpp:967
static std::optional< void * > get_pool() noexcept
Get a pointer to the thread pool that owns the current thread. If this thread belongs to a BS::thread...
Definition BS_thread_pool.hpp:987
static std::optional< std::size_t > get_index() noexcept
Get the index of the current thread. If this thread belongs to a BS::thread_pool object,...
Definition BS_thread_pool.hpp:977
A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread pool class.
Definition BS_thread_pool.hpp:1429
void create_threads(const std::size_t num_threads, F &&init)
Create the threads in the pool and assign a worker to each thread.
Definition BS_thread_pool.hpp:2064
void reset_pool(const std::size_t num_threads, F &&init)
Reset the pool with a new number of threads and a new initialization function. This member function i...
Definition BS_thread_pool.hpp:2269
thread_pool(F &&init)
Construct a new thread pool with the specified initialization function and the default number of thre...
Definition BS_thread_pool.hpp:1472
thread_pool(const std::size_t num_threads)
Construct a new thread pool with the specified number of threads.
Definition BS_thread_pool.hpp:1464
BS_THREAD_POOL_IF_PAUSE_ENABLED bool is_paused() const
Check whether the pool is currently paused. Only enabled if the flag BS::tp::pause is enabled in the ...
Definition BS_thread_pool.hpp:1716
multi_future< R > submit_bulk(const I first, const I last, const priority_t priority=0)
Submit an iterator range containing functions with no arguments into the task queue,...
Definition BS_thread_pool.hpp:1852
std::mutex tasks_mutex
A mutex to synchronize access to the task queue by different threads.
Definition BS_thread_pool.hpp:2341
void detach_blocks(const T1 first_index, const T2 index_after_last, F &&block, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1531
void detach_loop(const T1 first_index, const T2 index_after_last, F &&loop, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1596
void worker(BS_THREAD_POOL_WORKER_TOKEN const std::size_t idx)
A worker function to be assigned to each thread in the pool. Waits until it is notified by detach_tas...
Definition BS_thread_pool.hpp:2283
std::future< R > submit_task(F &&task, const priority_t priority=0)
Submit a function with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:1939
std::condition_variable task_available_cv
A condition variable to notify worker() that a new task has become available.
Definition BS_thread_pool.hpp:2351
thread_pool()
Construct a new thread pool. The number of threads will be the total number of hardware threads avail...
Definition BS_thread_pool.hpp:1457
std::condition_variable tasks_done_cv
A condition variable to notify wait() that the tasks are done.
Definition BS_thread_pool.hpp:2356
void reset(const std::size_t num_threads)
Reset the pool with a new number of threads. Waits for all tasks to be completed, both running and qu...
Definition BS_thread_pool.hpp:1754
BS_THREAD_POOL_IF_PAUSE_ENABLED void pause()
Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue,...
Definition BS_thread_pool.hpp:1726
multi_future< void > submit_loop(const T1 first_index, const T2 index_after_last, F &&loop, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1904
void reset(const std::size_t num_threads, F &&init)
Reset the pool with a new number of threads and a new initialization function. Waits for all tasks to...
Definition BS_thread_pool.hpp:1777
void purge()
Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected,...
Definition BS_thread_pool.hpp:1735
thread_pool(const std::size_t num_threads, F &&init)
Construct a new thread pool with the specified number of threads and initialization function.
Definition BS_thread_pool.hpp:1481
std::size_t get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition BS_thread_pool.hpp:1659
N enqueue_loop(const T first_index, const T index_after_last, F &&loop, std::size_t num_blocks, const priority_t priority=0)
A helper function for detach_loop() and submit_loop().
Definition BS_thread_pool.hpp:2193
multi_future< R > submit_blocks(const T1 first_index, const T2 index_after_last, F &&block, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1835
std::conditional_t< priority_enabled, std::priority_queue< pr_task >, std::queue< task_t > > tasks
A queue of tasks to be executed by the threads.
Definition BS_thread_pool.hpp:2371
std::vector< thread_t::id > get_thread_ids() const
Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::th...
Definition BS_thread_pool.hpp:1702
void set_cleanup_func(F &&cleanup)
Set the thread pool's cleanup function.
Definition BS_thread_pool.hpp:1804
static std::size_t determine_thread_count(const std::size_t num_threads) noexcept(!thread_pool_native_extensions)
Determine how many threads the pool should have, based on the parameter passed to the constructor or ...
Definition BS_thread_pool.hpp:2125
std::size_t get_tasks_total() const
Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread....
Definition BS_thread_pool.hpp:1681
bool wait_for(const std::chrono::duration< R, P > &duration)
Wait for tasks to be completed, but stop waiting after the specified duration has passed.
Definition BS_thread_pool.hpp:1996
void detach_bulk(const I first, const I last, const priority_t priority=0)
Submit an iterator range containing functions with no arguments and no return values into the task qu...
Definition BS_thread_pool.hpp:1545
void detach_bulk(C &container, const priority_t priority=0)
Submit a container of functions with no arguments and no return values into the task queue,...
Definition BS_thread_pool.hpp:1577
void reset(F &&init)
Reset the pool with the default number of threads and a new initialization function....
Definition BS_thread_pool.hpp:1765
N enqueue_sequence(const T first_index, const T index_after_last, F &&sequence, const priority_t priority=0)
A helper function for detach_sequence() and submit_sequence().
Definition BS_thread_pool.hpp:2228
std::size_t get_thread_count() const noexcept
Get the number of threads in the pool.
Definition BS_thread_pool.hpp:1692
N enqueue_blocks(const T first_index, const T index_after_last, F &&block, std::size_t num_blocks, const priority_t priority=0)
A helper function for detach_blocks() and submit_blocks().
Definition BS_thread_pool.hpp:2158
BS_THREAD_POOL_IF_PAUSE_ENABLED void unpause()
Unpause the pool. The workers will resume retrieving new tasks out of the queue. Only enabled if the ...
Definition BS_thread_pool.hpp:1950
void detach_task(F &&task, const priority_t priority=0)
Submit a function with no arguments and no return value into the task queue, with the specified prior...
Definition BS_thread_pool.hpp:1627
multi_future< R > submit_sequence(const T1 first_index, const T2 index_after_last, F &&sequence, const priority_t priority=0)
Submit a sequence of tasks enumerated by indices to the queue, with the specified priority....
Definition BS_thread_pool.hpp:1924
task_t pop_task()
Pop a task from the queue.
Definition BS_thread_pool.hpp:2251
void reset()
Reset the pool with the default number of threads (as if constructed with the default constructor)....
Definition BS_thread_pool.hpp:1744
bool wait_until(const std::chrono::time_point< C, D > &timeout_time)
Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
Definition BS_thread_pool.hpp:2029
void wait()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition BS_thread_pool.hpp:1964
multi_future< R > submit_bulk(C &container, const priority_t priority=0)
Submit a container of functions with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:1884
void destroy_threads()
Destroy the threads in the pool.
Definition BS_thread_pool.hpp:2108
std::size_t get_tasks_running() const
Get the number of tasks currently being executed by the threads.
Definition BS_thread_pool.hpp:1670
~thread_pool() noexcept
Destruct the thread pool. Waits for all tasks to complete, then destroys all threads....
Definition BS_thread_pool.hpp:1495
void detach_sequence(const T1 first_index, const T2 index_after_last, F &&sequence, const priority_t priority=0)
Submit a sequence of tasks enumerated by indices to the queue, with the specified priority....
Definition BS_thread_pool.hpp:1614
softfloat & operator=(const softfloat &c)
std::ostream & operator<<(std::ostream &, const DualQuat< _Tp > &)
__device__ __forceinline__ uchar1 operator<(const uchar1 &a, const uchar1 &b)
__device__ __forceinline__ R operator()(index_type y, index_type x) const
__device__ __forceinline__ uchar1 operator!=(const uchar1 &a, const uchar1 &b)
__device__ __forceinline__ uchar1 operator>(const uchar1 &a, const uchar1 &b)
__device__ __forceinline__ uchar4 operator==(const uchar4 &a, const uchar4 &b)
__device__ __forceinline__ uchar3 operator<=(const uchar3 &a, const uchar3 &b)
__device__ __forceinline__ uchar1 operator>=(const uchar1 &a, const uchar1 &b)
A namespace used by Barak Shoshany's projects.
Definition BS_thread_pool.hpp:134
std::uint8_t opt_t
The type used for the bitmask template parameter of the thread pool.
Definition BS_thread_pool.hpp:244
constexpr bool thread_pool_native_extensions
A flag indicating whether the thread pool library's native extensions are enabled.
Definition BS_thread_pool.hpp:238
std::int8_t priority_t
A type used to indicate the priority of a task. Defined to be a signed integer with a width of exactl...
Definition BS_thread_pool.hpp:392
constexpr bool thread_pool_import_std
A flag indicating whether the thread pool library imported the C++23 Standard Library module using im...
Definition BS_thread_pool.hpp:226
typename common_index_type< T1, T2 >::type common_index_type_t
A helper type alias to obtain the common type from the template BS::common_index_type.
Definition BS_thread_pool.hpp:1400
constexpr version thread_pool_version(BS_THREAD_POOL_VERSION_MAJOR, BS_THREAD_POOL_VERSION_MINOR, BS_THREAD_POOL_VERSION_PATCH)
The version of the thread pool library.
constexpr bool thread_pool_module
A flag indicating whether the thread pool library was compiled as a C++20 module.
Definition BS_thread_pool.hpp:214
pr
An enum containing some pre-defined priorities for convenience.
Definition BS_thread_pool.hpp:398
std::thread thread_t
The type of threads to use. In C++17 we use std::thread.
Definition BS_thread_pool.hpp:381
tp
An enumeration class of flags to be used in the bitmask template parameter of BS::thread_pool to enab...
Definition BS_thread_pool.hpp:250
@ none
No optional features enabled.
@ wait_deadlock_checks
Enable wait deadlock checks.
@ priority
Enable task priority.
@ pause
Enable pausing.
GOpaque< Size > size(const GMat &src)
typename std::enable_if< B, T >::type enable_if_t
cv::String join(const cv::String &base, const cv::String &path)
A function object class used by detach_blocks() and submit_blocks() to execute a block function over ...
Definition BS_thread_pool.hpp:672
A meta-programming template to determine the common type of two integer types. Unlike std::common_typ...
Definition BS_thread_pool.hpp:1360
A function object class used by detach_loop() and submit_loop() to execute a loop function over a spe...
Definition BS_thread_pool.hpp:691
A helper struct to store a task with an assigned priority.
Definition BS_thread_pool.hpp:410
task_t task
The task. It is mutable so it can be moved out of the const reference returned by std::priority_queue...
Definition BS_thread_pool.hpp:434
friend bool operator<(const pr_task &lhs, const pr_task &rhs) noexcept
Compare the priority of two tasks.
Definition BS_thread_pool.hpp:426
pr_task(task_t &&task_, const priority_t priority_=0) noexcept(std::is_nothrow_move_constructible_v< task_t >)
Construct a new task with an assigned priority.
Definition BS_thread_pool.hpp:417
A function object class used by detach_sequence() and submit_sequence() to execute a sequence functio...
Definition BS_thread_pool.hpp:712
A class that takes a function with a return value (but no arguments), and constructs a task with no r...
Definition BS_thread_pool.hpp:729
A struct used to store a version number, which can be checked and compared at compilation time.
Definition BS_thread_pool.hpp:144