17#ifndef BS_THREAD_POOL_HPP
18#define BS_THREAD_POOL_HPP
22 #if __has_include(<version>)
28#if defined(__clang__) && defined(_LIBCPP_VERSION) && defined(BS_THREAD_POOL_MODULE) && (__cplusplus >= 202002L) && !defined(BS_THREAD_POOL_DISABLE_WORKAROUNDS)
29 #ifdef __cpp_lib_jthread
30 #undef __cpp_lib_jthread
35#if (defined(__GNUC__) && defined(_GLIBCXX_RELEASE) && defined(_WIN32)) && !defined(BS_THREAD_POOL_DISABLE_WORKAROUNDS)
36 #ifdef BS_THREAD_POOL_IMPORT_STD
37 #undef BS_THREAD_POOL_IMPORT_STD
42#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
44 #ifndef WIN32_LEAN_AND_MEAN
45 #define WIN32_LEAN_AND_MEAN
51 #elif defined(__linux__) || defined(__APPLE__)
54 #include <sys/resource.h>
56 #if defined(__linux__)
57 #include <sys/syscall.h>
58 #include <sys/sysinfo.h>
61 #undef BS_THREAD_POOL_NATIVE_EXTENSIONS
66#if defined(BS_THREAD_POOL_IMPORT_STD) && (__cplusplus >= 202004L)
68 #ifdef BS_THREAD_POOL_MODULE
71 #error "The thread pool library cannot import the C++ Standard Library as a module using `import std` if the library itself is not imported as a module. Either use `import BS.thread_pool` to import the library, or remove the `BS_THREAD_POOL_IMPORT_STD` macro. Aborting compilation."
74 #undef BS_THREAD_POOL_IMPORT_STD
78 #include <condition_variable>
92 #include <type_traits>
100 #ifdef __cpp_exceptions
104 #ifdef __cpp_impl_three_way_comparison
107 #ifdef __cpp_lib_int_pow2
110 #ifdef __cpp_lib_jthread
111 #include <stop_token>
136#define BS_THREAD_POOL_VERSION_MAJOR 5
137#define BS_THREAD_POOL_VERSION_MINOR 1
138#define BS_THREAD_POOL_VERSION_PATCH 0
145 constexpr version(
const std::uint64_t major_,
const std::uint64_t minor_,
const std::uint64_t patch_) noexcept : major(major_), minor(minor_), patch(patch_) {}
148#ifdef __cpp_impl_three_way_comparison
149 std::strong_ordering operator<=>(
const version&)
const =
default;
153 return std::tuple(lhs.major, lhs.minor, lhs.patch) == std::tuple(rhs.major, rhs.minor, rhs.patch);
158 return !(lhs == rhs);
163 return std::tuple(lhs.major, lhs.minor, lhs.patch) < std::tuple(rhs.major, rhs.minor, rhs.patch);
173 return std::tuple(lhs.major, lhs.minor, lhs.patch) > std::tuple(rhs.major, rhs.minor, rhs.patch);
182 [[nodiscard]] std::string to_string()
const
184 return std::to_string(major) +
'.' + std::to_string(minor) +
'.' + std::to_string(patch);
189 stream << ver.to_string();
203#ifdef BS_THREAD_POOL_MODULE
205static_assert(
thread_pool_version ==
version(BS_THREAD_POOL_MODULE),
"The versions of BS.thread_pool.cppm and BS_thread_pool.hpp do not match. Aborting compilation.");
217#ifdef BS_THREAD_POOL_IMPORT_STD
229#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
273#define BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(ENUM, OP) \
274 constexpr ENUM operator OP(const ENUM lhs, const ENUM rhs) noexcept \
276 return static_cast<ENUM>(static_cast<std::underlying_type_t<ENUM>>(lhs) OP static_cast<std::underlying_type_t<ENUM>>(rhs)); \
278 constexpr ENUM& operator OP##=(ENUM& lhs, const ENUM rhs) noexcept \
280 return lhs = lhs OP rhs; \
284BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(
tp, &)
285BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(
tp, |)
286BS_THREAD_POOL_DEFINE_BITWISE_OPERATOR(
tp, ^)
288constexpr
tp operator~(const
tp value) noexcept
290 return static_cast<tp>(~static_cast<std::underlying_type_t<tp>>(value));
296#ifdef __cpp_lib_move_only_function
300using std::move_only_function;
302template <
typename...>
311template <
typename R,
typename... Args>
322 template <
typename F,
typename = std::enable_if_t<!std::is_same_v<std::decay_t<F>, move_only_function> && std::is_invocable_r_v<R, F&, Args...>>>
323 move_only_function(F&& func) : ptr(std::make_unique<func_model<std::decay_t<F>>>(std::forward<F>(func))) {}
327 return ptr->call(std::forward<Args>(args)...);
333 virtual ~func_concept() =
default;
334 virtual R call(Args... args) = 0;
337 template <
typename F>
338 struct func_model final : func_concept
340 template <
typename T,
typename = std::enable_if_t<!std::is_same_v<std::decay_t<T>, func_model>>>
341 explicit func_model(T&& func) : stored_func(std::forward<T>(func)) {}
343 R call(Args... args)
override
345 if constexpr (std::is_void_v<R>)
347 std::invoke(stored_func, std::forward<Args>(args)...);
351 return std::invoke(stored_func, std::forward<Args>(args)...);
358 std::unique_ptr<func_concept> ptr =
nullptr;
367#ifdef __cpp_lib_jthread
373 #define BS_THREAD_POOL_WORKER_TOKEN const std::stop_token &stop_token,
374 #define BS_THREAD_POOL_WAIT_TOKEN , stop_token
375 #define BS_THREAD_POOL_STOP_CONDITION stop_token.stop_requested()
376 #define BS_THREAD_POOL_OR_STOP_CONDITION
383 #define BS_THREAD_POOL_WORKER_TOKEN
384 #define BS_THREAD_POOL_WAIT_TOKEN
385 #define BS_THREAD_POOL_STOP_CONDITION !workers_running
386 #define BS_THREAD_POOL_OR_STOP_CONDITION || !workers_running
417 explicit pr_task(
task_t&& task_,
const priority_t priority_ = 0) noexcept(std::is_nothrow_move_constructible_v<
task_t>) : task(std::move(task_)), priority(priority_) {}
428 return lhs.priority < rhs.priority;
444 #define BS_THREAD_POOL_IF_PAUSE_ENABLED template <bool P = pause_enabled> requires(P)
446concept init_func_c = std::invocable<F> || std::invocable<F, std::size_t>;
447 #define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) init_func_c F
449 #define BS_THREAD_POOL_IF_PAUSE_ENABLED template <bool P = pause_enabled, typename = std::enable_if_t<P>>
450 #define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) typename F, typename = std::enable_if_t<std::is_invocable_v<F> || std::is_invocable_v<F, std::size_t>>
463 using std::vector<std::future<T>>::vector;
470 [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>>
get()
472 if constexpr (std::is_void_v<T>)
474 for (std::future<T>& future : *
this)
480 std::vector<T> results;
481 results.reserve(this->
size());
482 for (std::future<T>& future : *
this)
483 results.push_back(future.get());
495 std::size_t count = 0;
496 for (
const std::future<T>& future : *
this)
498 if (future.wait_for(std::chrono::duration<double>::zero()) == std::future_status::ready)
509 [[nodiscard]]
bool valid() const noexcept
511 bool is_valid =
true;
512 for (
const std::future<T>& future : *
this)
513 is_valid = is_valid && future.valid();
522 for (
const std::future<T>& future : *
this)
534 template <
typename R,
typename P>
535 bool wait_for(
const std::chrono::duration<R, P>& duration)
const
537 const std::chrono::time_point<std::chrono::steady_clock> start_time = std::chrono::steady_clock::now();
538 for (
const std::future<T>& future : *
this)
540 future.wait_for(duration - (std::chrono::steady_clock::now() - start_time));
541 if (duration < std::chrono::steady_clock::now() - start_time)
555 template <
typename C,
typename D>
556 bool wait_until(
const std::chrono::time_point<C, D>& timeout_time)
const
558 for (
const std::future<T>& future : *
this)
560 future.wait_until(timeout_time);
561 if (timeout_time < C::now())
584 blocks(
const T first_index_,
const T index_after_last_,
const std::size_t num_blocks_) noexcept : num_blocks(num_blocks_), first_index(first_index_), index_after_last(index_after_last_)
586 if (index_after_last > first_index)
588 const std::size_t total_size =
static_cast<std::size_t
>(index_after_last - first_index);
589 num_blocks = std::min(num_blocks, total_size);
590 block_size = total_size / num_blocks;
591 remainder = total_size % num_blocks;
595 num_blocks = (total_size > 1) ? total_size : 1;
610 [[nodiscard]] T
end(
const std::size_t block)
const noexcept
612 return (block == num_blocks - 1) ? index_after_last : start(block + 1);
631 [[nodiscard]] T
start(
const std::size_t block)
const noexcept
633 return first_index +
static_cast<T
>(block * block_size) +
static_cast<T
>(block < remainder ? block : remainder);
640 std::size_t block_size = 0;
645 std::size_t num_blocks = 0;
650 std::size_t remainder = 0;
660 T index_after_last = 0;
670template <
typename T,
typename F,
typename R>
675 return (*block_ptr)(start, end);
678 std::shared_ptr<std::decay_t<F>> block_ptr;
689template <
typename T,
typename F>
694 for (T i = start; i < end; ++i)
698 std::shared_ptr<std::decay_t<F>> loop_ptr;
710template <
typename T,
typename F,
typename R>
715 return (*sequence_ptr)(i);
718 std::shared_ptr<std::decay_t<F>> sequence_ptr;
730 template <
typename F,
typename = std::enable_if_t<!std::is_same_v<std::decay_t<F>, task_and_future<R>>>>
733 std::promise<R> promise;
734 future = promise.get_future();
735 task = [task = std::forward<F>(func), promise = std::move(promise)]()
mutable
737#ifdef __cpp_exceptions
741 if constexpr (std::is_void_v<R>)
748 promise.set_value(task());
750#ifdef __cpp_exceptions
756 promise.set_exception(std::current_exception());
766 std::future<R> future;
770#ifdef __cpp_exceptions
774struct [[nodiscard]] wait_deadlock :
public std::runtime_error
776 wait_deadlock() : std::runtime_error(
"BS::wait_deadlock") {};
780#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
785enum class os_process_priority
787 idle = IDLE_PRIORITY_CLASS,
788 below_normal = BELOW_NORMAL_PRIORITY_CLASS,
789 normal = NORMAL_PRIORITY_CLASS,
790 above_normal = ABOVE_NORMAL_PRIORITY_CLASS,
791 high = HIGH_PRIORITY_CLASS,
792 realtime = REALTIME_PRIORITY_CLASS
798enum class os_thread_priority
800 idle = THREAD_PRIORITY_IDLE,
801 lowest = THREAD_PRIORITY_LOWEST,
802 below_normal = THREAD_PRIORITY_BELOW_NORMAL,
803 normal = THREAD_PRIORITY_NORMAL,
804 above_normal = THREAD_PRIORITY_ABOVE_NORMAL,
805 highest = THREAD_PRIORITY_HIGHEST,
806 realtime = THREAD_PRIORITY_TIME_CRITICAL
808 #elif defined(__linux__) || defined(__APPLE__)
812enum class os_process_priority
815 below_normal = PRIO_MAX / 2,
817 above_normal = PRIO_MIN / 3,
818 high = PRIO_MIN * 2 / 3,
825enum class os_thread_priority
842[[nodiscard]]
inline std::optional<std::vector<bool>> get_os_process_affinity()
845 DWORD_PTR process_mask = 0;
846 DWORD_PTR system_mask = 0;
847 if (GetProcessAffinityMask(GetCurrentProcess(), &process_mask, &system_mask) == 0)
849 #ifdef __cpp_lib_int_pow2
850 const std::size_t num_cpus =
static_cast<std::size_t
>(std::bit_width(system_mask));
852 std::size_t num_cpus = 0;
853 if (system_mask != 0)
856 while ((system_mask >>= 1U) != 0U)
860 std::vector<bool> affinity(num_cpus);
861 for (std::size_t i = 0; i < num_cpus; ++i)
862 affinity[i] = ((process_mask & (1ULL << i)) != 0ULL);
864 #elif defined(__linux__)
867 if (sched_getaffinity(getpid(),
sizeof(cpu_set_t), &cpu_set) != 0)
869 const int num_cpus = get_nprocs();
872 std::vector<bool> affinity(
static_cast<std::size_t
>(num_cpus));
873 for (std::size_t i = 0; i < affinity.size(); ++i)
874 affinity[i] = CPU_ISSET(i, &cpu_set);
876 #elif defined(__APPLE__)
887inline bool set_os_process_affinity([[maybe_unused]]
const std::vector<bool>& affinity)
890 DWORD_PTR process_mask = 0;
891 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(),
sizeof(DWORD_PTR) * 8); ++i)
892 process_mask |= (affinity[i] ? (1ULL << i) : 0ULL);
893 return SetProcessAffinityMask(GetCurrentProcess(), process_mask) != 0;
894 #elif defined(__linux__)
897 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(), CPU_SETSIZE); ++i)
900 CPU_SET(i, &cpu_set);
902 return sched_setaffinity(getpid(),
sizeof(cpu_set_t), &cpu_set) == 0;
903 #elif defined(__APPLE__)
913[[nodiscard]]
inline std::optional<os_process_priority> get_os_process_priority()
917 const DWORD
priority = GetPriorityClass(GetCurrentProcess());
920 return static_cast<os_process_priority
>(
priority);
921 #elif defined(__linux__) || defined(__APPLE__)
923 const int nice_val = getpriority(PRIO_PROCESS,
static_cast<id_t
>(getpid()));
926 case static_cast<int>(os_process_priority::idle):
927 return os_process_priority::idle;
928 case static_cast<int>(os_process_priority::below_normal):
929 return os_process_priority::below_normal;
930 case static_cast<int>(os_process_priority::normal):
931 return os_process_priority::normal;
932 case static_cast<int>(os_process_priority::above_normal):
933 return os_process_priority::above_normal;
934 case static_cast<int>(os_process_priority::high):
935 return os_process_priority::high;
936 case static_cast<int>(os_process_priority::realtime):
937 return os_process_priority::realtime;
950inline bool set_os_process_priority(
const os_process_priority
priority)
954 return SetPriorityClass(GetCurrentProcess(),
static_cast<DWORD
>(
priority)) != 0;
955 #elif defined(__linux__) || defined(__APPLE__)
958 return setpriority(PRIO_PROCESS,
static_cast<id_t
>(getpid()),
static_cast<int>(
priority)) == 0;
977 [[nodiscard]]
static std::optional<std::size_t>
get_index() noexcept
987 [[nodiscard]]
static std::optional<void*>
get_pool() noexcept
992#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
998 [[nodiscard]]
static std::optional<std::vector<bool>> get_os_thread_affinity()
1002 DWORD_PTR process_mask = 0;
1003 DWORD_PTR system_mask = 0;
1004 if (GetProcessAffinityMask(GetCurrentProcess(), &process_mask, &system_mask) == 0)
1005 return std::nullopt;
1006 const DWORD_PTR previous_mask = SetThreadAffinityMask(GetCurrentThread(), process_mask);
1007 if (previous_mask == 0)
1008 return std::nullopt;
1009 SetThreadAffinityMask(GetCurrentThread(), previous_mask);
1010 #ifdef __cpp_lib_int_pow2
1011 const std::size_t num_cpus =
static_cast<std::size_t
>(std::bit_width(system_mask));
1013 std::size_t num_cpus = 0;
1014 if (system_mask != 0)
1017 while ((system_mask >>= 1U) != 0U)
1021 std::vector<bool> affinity(num_cpus);
1022 for (std::size_t i = 0; i < num_cpus; ++i)
1023 affinity[i] = ((previous_mask & (1ULL << i)) != 0ULL);
1025 #elif defined(__linux__) && !defined(__ANDROID__)
1028 if (pthread_getaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpu_set) != 0)
1029 return std::nullopt;
1030 const int num_cpus = get_nprocs();
1032 return std::nullopt;
1033 std::vector<bool> affinity(
static_cast<std::size_t
>(num_cpus));
1034 for (std::size_t i = 0; i < affinity.size(); ++i)
1035 affinity[i] = CPU_ISSET(i, &cpu_set);
1038 return std::nullopt;
1048 static bool set_os_thread_affinity([[maybe_unused]]
const std::vector<bool>& affinity)
1051 DWORD_PTR thread_mask = 0;
1052 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(),
sizeof(DWORD_PTR) * 8); ++i)
1053 thread_mask |= (affinity[i] ? (1ULL << i) : 0ULL);
1054 return SetThreadAffinityMask(GetCurrentThread(), thread_mask) != 0;
1055 #elif defined(__linux__) && !defined(__ANDROID__)
1058 for (std::size_t i = 0; i < std::min<std::size_t>(affinity.size(), CPU_SETSIZE); ++i)
1061 CPU_SET(i, &cpu_set);
1063 return pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpu_set) == 0;
1074 [[nodiscard]]
static std::optional<std::string> get_os_thread_name()
1078 PWSTR data =
nullptr;
1079 const HRESULT hr = GetThreadDescription(GetCurrentThread(), &data);
1081 return std::nullopt;
1082 if (data ==
nullptr)
1083 return std::nullopt;
1084 const int size = WideCharToMultiByte(CP_UTF8, 0, data, -1,
nullptr, 0,
nullptr,
nullptr);
1088 return std::nullopt;
1090 std::string name(
static_cast<std::size_t
>(size) - 1, 0);
1091 const int result = WideCharToMultiByte(CP_UTF8, 0, data, -1, name.data(), size,
nullptr,
nullptr);
1094 return std::nullopt;
1096 #elif defined(__linux__) || defined(__APPLE__)
1099 constexpr std::size_t buffer_size = 16;
1102 constexpr std::size_t buffer_size = 64;
1104 char name[buffer_size] = {};
1105 if (pthread_getname_np(pthread_self(), name, buffer_size) != 0)
1106 return std::nullopt;
1107 return std::string(name);
1117 static bool set_os_thread_name(
const std::string& name)
1121 const int size = MultiByteToWideChar(CP_UTF8, 0, name.data(), -1,
nullptr, 0);
1124 std::wstring wide(
static_cast<std::size_t
>(size), 0);
1125 if (MultiByteToWideChar(CP_UTF8, 0, name.data(), -1, wide.data(), size) == 0)
1127 const HRESULT hr = SetThreadDescription(GetCurrentThread(), wide.data());
1128 return SUCCEEDED(hr);
1129 #elif defined(__linux__)
1131 return pthread_setname_np(pthread_self(), name.data()) == 0;
1132 #elif defined(__APPLE__)
1134 return pthread_setname_np(name.data()) == 0;
1143 [[nodiscard]]
static std::optional<os_thread_priority> get_os_thread_priority()
1147 const int priority = GetThreadPriority(GetCurrentThread());
1148 if (priority == THREAD_PRIORITY_ERROR_RETURN)
1149 return std::nullopt;
1150 return static_cast<os_thread_priority
>(
priority);
1151 #elif defined(__linux__)
1154 struct sched_param param = {};
1155 if (pthread_getschedparam(pthread_self(), &policy, ¶m) != 0)
1156 return std::nullopt;
1157 if (policy == SCHED_FIFO && param.sched_priority == sched_get_priority_max(SCHED_FIFO))
1160 return os_thread_priority::realtime;
1162 if (policy == SCHED_RR && param.sched_priority == sched_get_priority_min(SCHED_RR) + ((sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2))
1165 return os_thread_priority::highest;
1168 if (policy == SCHED_IDLE)
1171 return os_thread_priority::idle;
1174 if (policy == SCHED_OTHER)
1177 const int nice_val = getpriority(PRIO_PROCESS,
static_cast<id_t
>(syscall(SYS_gettid)));
1181 return os_thread_priority::above_normal;
1183 return os_thread_priority::normal;
1184 case (PRIO_MAX / 2) + (PRIO_MAX % 2):
1185 return os_thread_priority::below_normal;
1187 return os_thread_priority::lowest;
1191 return os_thread_priority::idle;
1194 return std::nullopt;
1197 return std::nullopt;
1198 #elif defined(__APPLE__)
1201 struct sched_param param = {};
1202 if (pthread_getschedparam(pthread_self(), &policy, ¶m) != 0)
1203 return std::nullopt;
1204 if (policy == SCHED_FIFO && param.sched_priority == sched_get_priority_max(SCHED_FIFO))
1207 return os_thread_priority::realtime;
1209 if (policy == SCHED_RR && param.sched_priority == sched_get_priority_min(SCHED_RR) + (sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2)
1212 return os_thread_priority::highest;
1214 if (policy == SCHED_OTHER)
1217 if (param.sched_priority == sched_get_priority_max(SCHED_OTHER))
1218 return os_thread_priority::above_normal;
1219 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 2)
1220 return os_thread_priority::normal;
1221 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) * 2 / 3)
1222 return os_thread_priority::below_normal;
1223 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 3)
1224 return os_thread_priority::lowest;
1225 if (param.sched_priority == sched_get_priority_min(SCHED_OTHER))
1226 return os_thread_priority::idle;
1227 return std::nullopt;
1229 return std::nullopt;
1239 static bool set_os_thread_priority(
const os_thread_priority priority)
1243 return SetThreadPriority(GetCurrentThread(),
static_cast<int>(priority)) != 0;
1244 #elif defined(__linux__)
1247 struct sched_param param = {};
1248 std::optional<int> nice_val = std::nullopt;
1251 case os_thread_priority::realtime:
1253 policy = SCHED_FIFO;
1254 param.sched_priority = sched_get_priority_max(SCHED_FIFO);
1256 case os_thread_priority::highest:
1259 param.sched_priority = sched_get_priority_min(SCHED_RR) + ((sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2);
1261 case os_thread_priority::above_normal:
1263 policy = SCHED_OTHER;
1264 param.sched_priority = 0;
1265 nice_val = PRIO_MIN + 2;
1267 case os_thread_priority::normal:
1269 policy = SCHED_OTHER;
1270 param.sched_priority = 0;
1273 case os_thread_priority::below_normal:
1275 policy = SCHED_OTHER;
1276 param.sched_priority = 0;
1277 nice_val = (PRIO_MAX / 2) + (PRIO_MAX % 2);
1279 case os_thread_priority::lowest:
1281 policy = SCHED_OTHER;
1282 param.sched_priority = 0;
1283 nice_val = PRIO_MAX - 3;
1285 case os_thread_priority::idle:
1287 policy = SCHED_IDLE;
1288 param.sched_priority = 0;
1293 bool success = (pthread_setschedparam(pthread_self(), policy, ¶m) == 0);
1294 if (nice_val.has_value())
1295 success = success && (setpriority(PRIO_PROCESS,
static_cast<id_t
>(syscall(SYS_gettid)), nice_val.value()) == 0);
1297 #elif defined(__APPLE__)
1300 struct sched_param param = {};
1303 case os_thread_priority::realtime:
1305 policy = SCHED_FIFO;
1306 param.sched_priority = sched_get_priority_max(SCHED_FIFO);
1308 case os_thread_priority::highest:
1311 param.sched_priority = sched_get_priority_min(SCHED_RR) + (sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) / 2;
1313 case os_thread_priority::above_normal:
1315 policy = SCHED_OTHER;
1316 param.sched_priority = sched_get_priority_max(SCHED_OTHER);
1318 case os_thread_priority::normal:
1320 policy = SCHED_OTHER;
1321 param.sched_priority = sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 2;
1323 case os_thread_priority::below_normal:
1325 policy = SCHED_OTHER;
1326 param.sched_priority = sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) * 2 / 3;
1328 case os_thread_priority::lowest:
1330 policy = SCHED_OTHER;
1331 param.sched_priority = sched_get_priority_min(SCHED_OTHER) + (sched_get_priority_max(SCHED_OTHER) - sched_get_priority_min(SCHED_OTHER)) / 3;
1333 case os_thread_priority::idle:
1335 policy = SCHED_OTHER;
1336 param.sched_priority = sched_get_priority_min(SCHED_OTHER);
1341 return pthread_setschedparam(pthread_self(), policy, ¶m) == 0;
1347 inline static thread_local std::optional<std::size_t> my_index = std::nullopt;
1348 inline static thread_local std::optional<void*> my_pool = std::nullopt;
1358template <
typename T1,
typename T2,
typename Enable =
void>
1362 using type = std::common_type_t<T1, T2>;
1366template <
typename T1,
typename T2>
1369 using type = std::conditional_t<(
sizeof(T1) >=
sizeof(T2)), T1, T2>;
1373template <
typename T1,
typename T2>
1376 using type = std::conditional_t<(
sizeof(T1) >=
sizeof(T2)), T1, T2>;
1380template <
typename T1,
typename T2>
1383 using S = std::conditional_t<std::is_signed_v<T1>, T1, T2>;
1384 using U = std::conditional_t<std::is_unsigned_v<T1>, T1, T2>;
1385 static constexpr std::size_t larger_size = (
sizeof(S) >
sizeof(U)) ?
sizeof(S) :
sizeof(U);
1386 using type = std::conditional_t<larger_size <= 4,
1388 std::conditional_t<larger_size == 1 || (
sizeof(S) == 2 &&
sizeof(U) == 1), std::int16_t, std::conditional_t<larger_size == 2 || (
sizeof(S) == 4 &&
sizeof(U) < 4), std::int32_t, std::int64_t>>,
1390 std::conditional_t<
sizeof(U) == 8, std::uint64_t, std::int64_t>>;
1399template <
typename T1,
typename T2>
1427template <tp OptFlags = tp::none>
1434 static constexpr bool priority_enabled = (OptFlags & tp::priority) != tp::none;
1439 static constexpr bool pause_enabled = (OptFlags & tp::pause) != tp::none;
1444 static constexpr bool wait_deadlock_checks_enabled = (OptFlags & tp::wait_deadlock_checks) != tp::none;
1446#ifndef __cpp_exceptions
1447 static_assert(!wait_deadlock_checks_enabled,
"Wait deadlock checks cannot be enabled if exception handling is disabled.");
1471 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1480 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1483 create_threads(num_threads, std::forward<F>(init));
1497#ifdef __cpp_exceptions
1502#ifndef __cpp_lib_jthread
1505#ifdef __cpp_exceptions
1530 template <
typename T1,
typename T2,
typename T = common_index_type_t<T1, T2>,
typename F>
1531 void detach_blocks(
const T1 first_index,
const T2 index_after_last, F&& block,
const std::size_t num_blocks = 0,
const priority_t priority = 0)
1533 enqueue_blocks<T, F, void, false>(
static_cast<T
>(first_index),
static_cast<T
>(index_after_last), std::forward<F>(block), num_blocks,
priority);
1544 template <
typename I>
1549 bool notify =
false;
1551 const std::scoped_lock tasks_lock(tasks_mutex);
1552 if constexpr (pause_enabled)
1553 notify = tasks.empty() && !paused;
1555 notify = tasks.empty();
1556 for (I it = first; it != last; ++it)
1558 if constexpr (priority_enabled)
1559 tasks.emplace(std::move(*it),
priority);
1561 tasks.emplace(std::move(*it));
1565 task_available_cv.notify_all();
1576 template <
typename C>
1579 detach_bulk(std::begin(container), std::end(container),
priority);
1595 template <
typename T1,
typename T2,
typename T = common_index_type_t<T1, T2>,
typename F>
1596 void detach_loop(
const T1 first_index,
const T2 index_after_last, F&& loop,
const std::size_t num_blocks = 0,
const priority_t priority = 0)
1598 enqueue_loop<T, F, false>(
static_cast<T
>(first_index),
static_cast<T
>(index_after_last), std::forward<F>(loop), num_blocks,
priority);
1613 template <
typename T1,
typename T2,
typename T = common_index_type_t<T1, T2>,
typename F>
1616 return enqueue_sequence<T, F, void, false>(
static_cast<T
>(first_index),
static_cast<T
>(index_after_last), std::forward<F>(sequence),
priority);
1626 template <
typename F>
1630 const std::scoped_lock tasks_lock(tasks_mutex);
1631 if constexpr (priority_enabled)
1632 tasks.emplace(std::forward<F>(task),
priority);
1634 tasks.emplace(std::forward<F>(task));
1636 task_available_cv.notify_one();
1639#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
1645 [[nodiscard]] std::vector<thread_t::native_handle_type> get_native_handles()
const
1647 std::vector<thread_t::native_handle_type> native_handles(thread_count);
1648 for (std::size_t i = 0; i < thread_count; ++i)
1649 native_handles[i] = threads[i].native_handle();
1650 return native_handles;
1661 const std::scoped_lock tasks_lock(tasks_mutex);
1662 return tasks.size();
1672 const std::scoped_lock tasks_lock(tasks_mutex);
1673 return tasks_running;
1683 const std::scoped_lock tasks_lock(tasks_mutex);
1684 return tasks_running + tasks.size();
1694 return thread_count;
1704 std::vector<thread_t::id> thread_ids(thread_count);
1705 for (std::size_t i = 0; i < thread_count; ++i)
1706 thread_ids[i] = threads[i].get_id();
1715 BS_THREAD_POOL_IF_PAUSE_ENABLED
1718 const std::scoped_lock tasks_lock(tasks_mutex);
1725 BS_THREAD_POOL_IF_PAUSE_ENABLED
1728 const std::scoped_lock tasks_lock(tasks_mutex);
1737 const std::scoped_lock tasks_lock(tasks_mutex);
1746 reset(0, [](std::size_t) {});
1754 void reset(
const std::size_t num_threads)
1756 reset(num_threads, [](std::size_t) {});
1764 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1767 reset(0, std::forward<F>(init));
1776 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1777 void reset(
const std::size_t num_threads, F&& init)
1779 if constexpr (pause_enabled)
1781 std::unique_lock tasks_lock(tasks_mutex);
1782 const bool was_paused = paused;
1784 tasks_lock.unlock();
1785 reset_pool(num_threads, std::forward<F>(init));
1787 paused = was_paused;
1788 tasks_lock.unlock();
1790 task_available_cv.notify_all();
1794 reset_pool(num_threads, std::forward<F>(init));
1803 template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1806 if constexpr (std::is_invocable_v<F, std::size_t>)
1808 cleanup_func = std::forward<F>(cleanup);
1812 cleanup_func = [cleanup = std::forward<F>(cleanup)](std::size_t)
1834 template <
typename T1,
typename T2,
typename T = common_index_type_t<T1, T2>,
typename F,
typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
1837 return enqueue_blocks<T, F, R, true>(
static_cast<T
>(first_index),
static_cast<T
>(index_after_last), std::forward<F>(block), num_blocks,
priority);
1851 template <typename I, typename F = decltype(*std::declval<I>()),
typename R = std::invoke_result_t<std::decay_t<F>>>
1856 const std::size_t num_tasks =
static_cast<std::size_t
>(std::distance(first, last));
1858 all_futures.reserve(num_tasks);
1859 std::vector<task_t> all_tasks;
1860 all_tasks.reserve(num_tasks);
1861 for (I it = first; it != last; ++it)
1864 all_futures.emplace_back(std::move(ft.future));
1865 all_tasks.emplace_back(std::move(ft.task));
1883 template <typename C, typename F = decltype(*std::declval<C&>().begin()),
typename R = std::invoke_result_t<std::decay_t<F>>>
1886 return submit_bulk(std::begin(container), std::end(container),
priority);
1903 template <
typename T1,
typename T2,
typename T = common_index_type_t<T1, T2>,
typename F>
1906 return enqueue_loop<T, F, true>(
static_cast<T
>(first_index),
static_cast<T
>(index_after_last), std::forward<F>(loop), num_blocks,
priority);
1923 template <
typename T1,
typename T2,
typename T = common_index_type_t<T1, T2>,
typename F,
typename R = std::invoke_result_t<std::decay_t<F>, T>>
1926 return enqueue_sequence<T, F, R, true>(
static_cast<T
>(first_index),
static_cast<T
>(index_after_last), std::forward<F>(sequence),
priority);
1938 template <
typename F,
typename R = std::invoke_result_t<std::decay_t<F>>>
1942 detach_task(std::move(ft.task),
priority);
1943 return std::move(ft.future);
1949 BS_THREAD_POOL_IF_PAUSE_ENABLED
1953 const std::scoped_lock tasks_lock(tasks_mutex);
1956 task_available_cv.notify_all();
1966#ifdef __cpp_exceptions
1967 if constexpr (wait_deadlock_checks_enabled)
1969 if (this_thread::get_pool() ==
this)
1970 throw wait_deadlock();
1973 std::unique_lock tasks_lock(tasks_mutex);
1975 tasks_done_cv.wait(tasks_lock,
1978 if constexpr (pause_enabled)
1979 return (tasks_running == 0) && (paused || tasks.empty());
1981 return (tasks_running == 0) && tasks.empty();
1995 template <
typename R,
typename P>
1996 bool wait_for(
const std::chrono::duration<R, P>& duration)
1998#ifdef __cpp_exceptions
1999 if constexpr (wait_deadlock_checks_enabled)
2001 if (this_thread::get_pool() ==
this)
2002 throw wait_deadlock();
2005 std::unique_lock tasks_lock(tasks_mutex);
2007 const bool status = tasks_done_cv.wait_for(tasks_lock, duration,
2010 if constexpr (pause_enabled)
2011 return (tasks_running == 0) && (paused || tasks.empty());
2013 return (tasks_running == 0) && tasks.empty();
2028 template <
typename C,
typename D>
2029 bool wait_until(
const std::chrono::time_point<C, D>& timeout_time)
2031#ifdef __cpp_exceptions
2032 if constexpr (wait_deadlock_checks_enabled)
2034 if (this_thread::get_pool() ==
this)
2035 throw wait_deadlock();
2038 std::unique_lock tasks_lock(tasks_mutex);
2040 const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time,
2043 if constexpr (pause_enabled)
2044 return (tasks_running == 0) && (paused || tasks.empty());
2046 return (tasks_running == 0) && tasks.empty();
2063 template <
typename F>
2066 if constexpr (std::is_invocable_v<F, std::size_t>)
2068 init_func = std::forward<F>(init);
2072 init_func = [init = std::forward<F>(init)](std::size_t)
2077 thread_count = determine_thread_count(num_threads);
2078 threads = std::make_unique<thread_t[]>(thread_count);
2080 const std::scoped_lock tasks_lock(tasks_mutex);
2081 tasks_running = thread_count;
2082#ifndef __cpp_lib_jthread
2083 workers_running =
true;
2086 for (std::size_t i = 0; i < thread_count; ++i)
2090#ifdef __cpp_lib_jthread
2091 (
const std::stop_token& stop_token)
2093 worker(stop_token, i);
2104#ifndef __cpp_lib_jthread
2111 const std::scoped_lock tasks_lock(tasks_mutex);
2112 workers_running =
false;
2114 task_available_cv.notify_all();
2115 for (std::size_t i = 0; i < thread_count; ++i)
2127 if (num_threads > 0)
2129#ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
2130 const std::optional<std::vector<bool>> affinity = BS::get_os_process_affinity();
2131 if (affinity.has_value())
2133 const std::size_t affinity_thread_count =
static_cast<std::size_t
>(std::count(affinity->begin(), affinity->end(),
true));
2134 return (affinity_thread_count > 0) ? affinity_thread_count : 1;
2137 if (thread_t::hardware_concurrency() > 0)
2138 return thread_t::hardware_concurrency();
2157 template <
typename T,
typename F,
typename R,
bool submit,
typename N = std::conditional_t<submit, multi_future<R>,
void>>
2158 [[nodiscard]] N
enqueue_blocks(
const T first_index,
const T index_after_last, F&& block, std::size_t num_blocks,
const priority_t priority = 0)
2160 if (index_after_last > first_index)
2163 const std::shared_ptr<std::decay_t<F>> block_ptr = std::make_shared<std::decay_t<F>>(std::forward<F>(block));
2164 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
2166 std::vector<std::conditional_t<submit, block_task_t, task_t>> all_tasks;
2167 all_tasks.reserve(num_blocks);
2168 for (std::size_t i = 0; i < num_blocks; ++i)
2169 all_tasks.emplace_back(block_task_t{block_ptr, blks.start(i), blks.end(i)});
2170 if constexpr (submit)
2171 return submit_bulk(all_tasks,
priority);
2192 template <
typename T,
typename F,
bool submit,
typename N = std::conditional_t<submit, multi_future<
void>,
void>>
2195 if (index_after_last > first_index)
2198 const std::shared_ptr<std::decay_t<F>> loop_ptr = std::make_shared<std::decay_t<F>>(std::forward<F>(loop));
2199 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
2201 std::vector<std::conditional_t<submit, loop_task_t, task_t>> all_tasks;
2202 all_tasks.reserve(num_blocks);
2203 for (std::size_t i = 0; i < num_blocks; ++i)
2204 all_tasks.emplace_back(loop_task_t{loop_ptr, blks.start(i), blks.end(i)});
2205 if constexpr (submit)
2206 return submit_bulk(all_tasks,
priority);
2227 template <
typename T,
typename F,
typename R,
bool submit,
typename N = std::conditional_t<submit, multi_future<R>,
void>>
2230 if (index_after_last > first_index)
2233 const std::shared_ptr<std::decay_t<F>> sequence_ptr = std::make_shared<std::decay_t<F>>(std::forward<F>(sequence));
2234 std::vector<std::conditional_t<submit, sequence_task_t, task_t>> all_tasks;
2235 all_tasks.reserve(
static_cast<std::size_t
>(index_after_last - first_index));
2236 for (T i = first_index; i < index_after_last; ++i)
2237 all_tasks.emplace_back(sequence_task_t{sequence_ptr, i});
2238 if constexpr (submit)
2239 return submit_bulk(all_tasks,
priority);
2254 if constexpr (priority_enabled)
2255 task = std::move(tasks.top().task);
2257 task = std::move(tasks.front());
2268 template <
typename F>
2272#ifndef __cpp_lib_jthread
2275 create_threads(num_threads, std::forward<F>(init));
2283 void worker(BS_THREAD_POOL_WORKER_TOKEN
const std::size_t idx)
2285 this_thread::my_pool =
this;
2286 this_thread::my_index = idx;
2290 std::unique_lock tasks_lock(tasks_mutex);
2292 if constexpr (pause_enabled)
2294 if (waiting && (tasks_running == 0) && (paused || tasks.empty()))
2295 tasks_done_cv.notify_all();
2299 if (waiting && (tasks_running == 0) && tasks.empty())
2300 tasks_done_cv.notify_all();
2302 task_available_cv.wait(tasks_lock BS_THREAD_POOL_WAIT_TOKEN,
2305 if constexpr (pause_enabled)
2306 return !(paused || tasks.empty()) BS_THREAD_POOL_OR_STOP_CONDITION;
2308 return !tasks.empty() BS_THREAD_POOL_OR_STOP_CONDITION;
2310 if (BS_THREAD_POOL_STOP_CONDITION)
2313 task_t task = pop_task();
2315 tasks_lock.unlock();
2316#ifdef __cpp_exceptions
2321#ifdef __cpp_exceptions
2330 this_thread::my_index = std::nullopt;
2331 this_thread::my_pool = std::nullopt;
2346#ifdef __cpp_lib_jthread
2347 std::condition_variable_any
2349 std::condition_variable
2371 std::conditional_t<priority_enabled, std::priority_queue<pr_task>, std::queue<task_t>>
tasks;
2376 std::size_t tasks_running = 0;
2381 std::size_t thread_count = 0;
2386 std::unique_ptr<thread_t[]> threads =
nullptr;
2391 std::conditional_t<pause_enabled, bool, std::monostate> paused = {};
2396 bool waiting =
false;
2398#ifndef __cpp_lib_jthread
2402 bool workers_running =
false;
2417 add_stream(std::cout);
2426 template <
typename... T>
2429 (add_stream(streams), ...);
2439 out_streams.push_back(&stream);
2458 template <
typename... T>
2461 const std::scoped_lock stream_lock(stream_mutex);
2462 for (std::ostream*
const stream : out_streams)
2463 (*stream << ... << items);
2472 template <
typename... T>
2475 print(std::forward<T>(items)...,
'\n');
2485 out_streams.erase(std::remove(out_streams.begin(), out_streams.end(), &stream), out_streams.end());
2491 inline static std::ostream& (&
endl)(std::ostream&) =
static_cast<std::ostream& (&)(std::ostream&)
>(std::endl);
2496 inline static std::ostream& (&
flush)(std::ostream&) =
static_cast<std::ostream& (&)(std::ostream&)
>(std::flush);
A helper class to divide a range into blocks. Used by detach_blocks(), submit_blocks(),...
Definition BS_thread_pool.hpp:575
T start(const std::size_t block) const noexcept
Get the first index of a block.
Definition BS_thread_pool.hpp:631
T end(const std::size_t block) const noexcept
Get the index after the last index of a block.
Definition BS_thread_pool.hpp:610
std::size_t get_num_blocks() const noexcept
Get the number of blocks. Note that this may be different than the desired number of blocks that was ...
Definition BS_thread_pool.hpp:620
blocks(const T first_index_, const T index_after_last_, const std::size_t num_blocks_) noexcept
Construct a blocks object with the given specifications.
Definition BS_thread_pool.hpp:584
Definition BS_thread_pool.hpp:303
A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
Definition BS_thread_pool.hpp:460
void wait() const
Wait for all the futures stored in this BS::multi_future.
Definition BS_thread_pool.hpp:520
bool valid() const noexcept
Check if all the futures stored in this BS::multi_future are valid.
Definition BS_thread_pool.hpp:509
bool wait_for(const std::chrono::duration< R, P > &duration) const
Wait for all the futures stored in this BS::multi_future, but stop waiting after the specified durati...
Definition BS_thread_pool.hpp:535
std::size_t ready_count() const
Check how many of the futures stored in this BS::multi_future are ready.
Definition BS_thread_pool.hpp:493
std::conditional_t< std::is_void_v< T >, void, std::vector< T > > get()
Get the results from all the futures stored in this BS::multi_future, rethrowing any stored exception...
Definition BS_thread_pool.hpp:470
bool wait_until(const std::chrono::time_point< C, D > &timeout_time) const
Wait for all the futures stored in this BS::multi_future, but stop waiting after the specified time p...
Definition BS_thread_pool.hpp:556
A utility class to synchronize printing to one or more output streams by different threads.
Definition BS_thread_pool.hpp:2410
std::vector< std::ostream * > out_streams
The output streams to print to.
Definition BS_thread_pool.hpp:2507
std::mutex stream_mutex
A mutex to synchronize printing.
Definition BS_thread_pool.hpp:2502
static std::ostream &(&) endl(std::ostream &)
A stream manipulator to pass to a BS::synced_stream (an explicit cast of std::endl)....
Definition BS_thread_pool.hpp:2491
static std::ostream &(&) flush(std::ostream &)
A stream manipulator to pass to a BS::synced_stream (an explicit cast of std::flush)....
Definition BS_thread_pool.hpp:2496
synced_stream()
Construct a new synced stream which prints to std::cout.
Definition BS_thread_pool.hpp:2415
void add_stream(std::ostream &stream)
Add a stream to the list of output streams to print to.
Definition BS_thread_pool.hpp:2437
void println(T &&... items)
Print any number of items into each output stream, followed by a newline character....
Definition BS_thread_pool.hpp:2473
void remove_stream(std::ostream &stream)
Remove a stream from the list of output streams to print to.
Definition BS_thread_pool.hpp:2483
void print(const T &... items)
Print any number of items into each output stream. Ensures that no other threads print to the streams...
Definition BS_thread_pool.hpp:2459
std::vector< std::ostream * > & get_streams() noexcept
Get a reference to a vector containing pointers to the output streams to print to.
Definition BS_thread_pool.hpp:2447
synced_stream(T &... streams)
Construct a new synced stream which prints to the given output stream(s).
Definition BS_thread_pool.hpp:2427
A class used to obtain information about the current thread and, if native extensions are enabled,...
Definition BS_thread_pool.hpp:967
static std::optional< void * > get_pool() noexcept
Get a pointer to the thread pool that owns the current thread. If this thread belongs to a BS::thread...
Definition BS_thread_pool.hpp:987
static std::optional< std::size_t > get_index() noexcept
Get the index of the current thread. If this thread belongs to a BS::thread_pool object,...
Definition BS_thread_pool.hpp:977
A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread pool class.
Definition BS_thread_pool.hpp:1429
void create_threads(const std::size_t num_threads, F &&init)
Create the threads in the pool and assign a worker to each thread.
Definition BS_thread_pool.hpp:2064
void reset_pool(const std::size_t num_threads, F &&init)
Reset the pool with a new number of threads and a new initialization function. This member function i...
Definition BS_thread_pool.hpp:2269
thread_pool(F &&init)
Construct a new thread pool with the specified initialization function and the default number of thre...
Definition BS_thread_pool.hpp:1472
thread_pool(const std::size_t num_threads)
Construct a new thread pool with the specified number of threads.
Definition BS_thread_pool.hpp:1464
BS_THREAD_POOL_IF_PAUSE_ENABLED bool is_paused() const
Check whether the pool is currently paused. Only enabled if the flag BS::tp::pause is enabled in the ...
Definition BS_thread_pool.hpp:1716
multi_future< R > submit_bulk(const I first, const I last, const priority_t priority=0)
Submit an iterator range containing functions with no arguments into the task queue,...
Definition BS_thread_pool.hpp:1852
std::mutex tasks_mutex
A mutex to synchronize access to the task queue by different threads.
Definition BS_thread_pool.hpp:2341
void detach_blocks(const T1 first_index, const T2 index_after_last, F &&block, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1531
void detach_loop(const T1 first_index, const T2 index_after_last, F &&loop, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1596
void worker(BS_THREAD_POOL_WORKER_TOKEN const std::size_t idx)
A worker function to be assigned to each thread in the pool. Waits until it is notified by detach_tas...
Definition BS_thread_pool.hpp:2283
std::future< R > submit_task(F &&task, const priority_t priority=0)
Submit a function with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:1939
std::condition_variable task_available_cv
A condition variable to notify worker() that a new task has become available.
Definition BS_thread_pool.hpp:2351
thread_pool()
Construct a new thread pool. The number of threads will be the total number of hardware threads avail...
Definition BS_thread_pool.hpp:1457
std::condition_variable tasks_done_cv
A condition variable to notify wait() that the tasks are done.
Definition BS_thread_pool.hpp:2356
void reset(const std::size_t num_threads)
Reset the pool with a new number of threads. Waits for all tasks to be completed, both running and qu...
Definition BS_thread_pool.hpp:1754
BS_THREAD_POOL_IF_PAUSE_ENABLED void pause()
Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue,...
Definition BS_thread_pool.hpp:1726
multi_future< void > submit_loop(const T1 first_index, const T2 index_after_last, F &&loop, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1904
void reset(const std::size_t num_threads, F &&init)
Reset the pool with a new number of threads and a new initialization function. Waits for all tasks to...
Definition BS_thread_pool.hpp:1777
void purge()
Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected,...
Definition BS_thread_pool.hpp:1735
thread_pool(const std::size_t num_threads, F &&init)
Construct a new thread pool with the specified number of threads and initialization function.
Definition BS_thread_pool.hpp:1481
std::size_t get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition BS_thread_pool.hpp:1659
N enqueue_loop(const T first_index, const T index_after_last, F &&loop, std::size_t num_blocks, const priority_t priority=0)
A helper function for detach_loop() and submit_loop().
Definition BS_thread_pool.hpp:2193
multi_future< R > submit_blocks(const T1 first_index, const T2 index_after_last, F &&block, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1835
std::conditional_t< priority_enabled, std::priority_queue< pr_task >, std::queue< task_t > > tasks
A queue of tasks to be executed by the threads.
Definition BS_thread_pool.hpp:2371
std::vector< thread_t::id > get_thread_ids() const
Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::th...
Definition BS_thread_pool.hpp:1702
void set_cleanup_func(F &&cleanup)
Set the thread pool's cleanup function.
Definition BS_thread_pool.hpp:1804
static std::size_t determine_thread_count(const std::size_t num_threads) noexcept(!thread_pool_native_extensions)
Determine how many threads the pool should have, based on the parameter passed to the constructor or ...
Definition BS_thread_pool.hpp:2125
std::size_t get_tasks_total() const
Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread....
Definition BS_thread_pool.hpp:1681
bool wait_for(const std::chrono::duration< R, P > &duration)
Wait for tasks to be completed, but stop waiting after the specified duration has passed.
Definition BS_thread_pool.hpp:1996
void detach_bulk(const I first, const I last, const priority_t priority=0)
Submit an iterator range containing functions with no arguments and no return values into the task qu...
Definition BS_thread_pool.hpp:1545
void detach_bulk(C &container, const priority_t priority=0)
Submit a container of functions with no arguments and no return values into the task queue,...
Definition BS_thread_pool.hpp:1577
void reset(F &&init)
Reset the pool with the default number of threads and a new initialization function....
Definition BS_thread_pool.hpp:1765
N enqueue_sequence(const T first_index, const T index_after_last, F &&sequence, const priority_t priority=0)
A helper function for detach_sequence() and submit_sequence().
Definition BS_thread_pool.hpp:2228
std::size_t get_thread_count() const noexcept
Get the number of threads in the pool.
Definition BS_thread_pool.hpp:1692
N enqueue_blocks(const T first_index, const T index_after_last, F &&block, std::size_t num_blocks, const priority_t priority=0)
A helper function for detach_blocks() and submit_blocks().
Definition BS_thread_pool.hpp:2158
BS_THREAD_POOL_IF_PAUSE_ENABLED void unpause()
Unpause the pool. The workers will resume retrieving new tasks out of the queue. Only enabled if the ...
Definition BS_thread_pool.hpp:1950
void detach_task(F &&task, const priority_t priority=0)
Submit a function with no arguments and no return value into the task queue, with the specified prior...
Definition BS_thread_pool.hpp:1627
multi_future< R > submit_sequence(const T1 first_index, const T2 index_after_last, F &&sequence, const priority_t priority=0)
Submit a sequence of tasks enumerated by indices to the queue, with the specified priority....
Definition BS_thread_pool.hpp:1924
task_t pop_task()
Pop a task from the queue.
Definition BS_thread_pool.hpp:2251
void reset()
Reset the pool with the default number of threads (as if constructed with the default constructor)....
Definition BS_thread_pool.hpp:1744
bool wait_until(const std::chrono::time_point< C, D > &timeout_time)
Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
Definition BS_thread_pool.hpp:2029
void wait()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition BS_thread_pool.hpp:1964
multi_future< R > submit_bulk(C &container, const priority_t priority=0)
Submit a container of functions with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:1884
void destroy_threads()
Destroy the threads in the pool.
Definition BS_thread_pool.hpp:2108
std::size_t get_tasks_running() const
Get the number of tasks currently being executed by the threads.
Definition BS_thread_pool.hpp:1670
~thread_pool() noexcept
Destruct the thread pool. Waits for all tasks to complete, then destroys all threads....
Definition BS_thread_pool.hpp:1495
void detach_sequence(const T1 first_index, const T2 index_after_last, F &&sequence, const priority_t priority=0)
Submit a sequence of tasks enumerated by indices to the queue, with the specified priority....
Definition BS_thread_pool.hpp:1614
softfloat & operator=(const softfloat &c)
std::ostream & operator<<(std::ostream &, const DualQuat< _Tp > &)
__device__ __forceinline__ uchar1 operator<(const uchar1 &a, const uchar1 &b)
__device__ __forceinline__ R operator()(index_type y, index_type x) const
__device__ __forceinline__ uchar1 operator!=(const uchar1 &a, const uchar1 &b)
__device__ __forceinline__ uchar1 operator>(const uchar1 &a, const uchar1 &b)
__device__ __forceinline__ uchar4 operator==(const uchar4 &a, const uchar4 &b)
__device__ __forceinline__ uchar3 operator<=(const uchar3 &a, const uchar3 &b)
__device__ __forceinline__ uchar1 operator>=(const uchar1 &a, const uchar1 &b)
A namespace used by Barak Shoshany's projects.
Definition BS_thread_pool.hpp:134
std::uint8_t opt_t
The type used for the bitmask template parameter of the thread pool.
Definition BS_thread_pool.hpp:244
constexpr bool thread_pool_native_extensions
A flag indicating whether the thread pool library's native extensions are enabled.
Definition BS_thread_pool.hpp:238
std::int8_t priority_t
A type used to indicate the priority of a task. Defined to be a signed integer with a width of exactl...
Definition BS_thread_pool.hpp:392
constexpr bool thread_pool_import_std
A flag indicating whether the thread pool library imported the C++23 Standard Library module using im...
Definition BS_thread_pool.hpp:226
typename common_index_type< T1, T2 >::type common_index_type_t
A helper type alias to obtain the common type from the template BS::common_index_type.
Definition BS_thread_pool.hpp:1400
constexpr version thread_pool_version(BS_THREAD_POOL_VERSION_MAJOR, BS_THREAD_POOL_VERSION_MINOR, BS_THREAD_POOL_VERSION_PATCH)
The version of the thread pool library.
constexpr bool thread_pool_module
A flag indicating whether the thread pool library was compiled as a C++20 module.
Definition BS_thread_pool.hpp:214
pr
An enum containing some pre-defined priorities for convenience.
Definition BS_thread_pool.hpp:398
std::thread thread_t
The type of threads to use. In C++17 we use std::thread.
Definition BS_thread_pool.hpp:381
tp
An enumeration class of flags to be used in the bitmask template parameter of BS::thread_pool to enab...
Definition BS_thread_pool.hpp:250
@ none
No optional features enabled.
@ wait_deadlock_checks
Enable wait deadlock checks.
@ priority
Enable task priority.
GOpaque< Size > size(const GMat &src)
typename std::enable_if< B, T >::type enable_if_t
cv::String join(const cv::String &base, const cv::String &path)
A function object class used by detach_blocks() and submit_blocks() to execute a block function over ...
Definition BS_thread_pool.hpp:672
A meta-programming template to determine the common type of two integer types. Unlike std::common_typ...
Definition BS_thread_pool.hpp:1360
A function object class used by detach_loop() and submit_loop() to execute a loop function over a spe...
Definition BS_thread_pool.hpp:691
A helper struct to store a task with an assigned priority.
Definition BS_thread_pool.hpp:410
task_t task
The task. It is mutable so it can be moved out of the const reference returned by std::priority_queue...
Definition BS_thread_pool.hpp:434
friend bool operator<(const pr_task &lhs, const pr_task &rhs) noexcept
Compare the priority of two tasks.
Definition BS_thread_pool.hpp:426
pr_task(task_t &&task_, const priority_t priority_=0) noexcept(std::is_nothrow_move_constructible_v< task_t >)
Construct a new task with an assigned priority.
Definition BS_thread_pool.hpp:417
A function object class used by detach_sequence() and submit_sequence() to execute a sequence functio...
Definition BS_thread_pool.hpp:712
A class that takes a function with a return value (but no arguments), and constructs a task with no r...
Definition BS_thread_pool.hpp:729
A struct used to store a version number, which can be checked and compared at compilation time.
Definition BS_thread_pool.hpp:144