Autonomy Software C++ 24.5.1
Welcome to the Autonomy Software repository of the Mars Rover Design Team (MRDT) at Missouri University of Science and Technology (Missouri S&T)! API reference contains the source code and other resources for the development of the autonomy software for our Mars rover. The Autonomy Software project aims to compete in the University Rover Challenge (URC) by demonstrating advanced autonomous capabilities and robust navigation algorithms.
Loading...
Searching...
No Matches
AutonomyThread.hpp
Go to the documentation of this file.
1
14#ifndef AUTONOMYTHREAD_H
15#define AUTONOMYTHREAD_H
16
17#include "../util/IPS.hpp"
18
20#include "../../external/threadpool/include/BS_thread_pool.hpp"
21#include <atomic>
22#include <chrono>
23#include <condition_variable>
24#include <random>
25#include <sstream>
26#include <vector>
27
29
30
38template<class T>
40{
41 public:
43 // Define public enumerators specific to this class.
45
46 // Define an enum for storing this classes state.
47 enum class AutonomyThreadState
48 {
49 eStarting,
50 eRunning,
51 eStopping,
52 eStopped
53 };
54
55 // Define an enum for storing thread priority levels. These are used as an abstraction layer for the BS::pr enum, so we don't have to include the thread pool
56 // header in child classes just to set priorities.
57 enum class AutonomyThreadPriority
58 {
59 eLowest = BS::pr::lowest, // Thread will be scheduled less often and may not be able to reach the max IPS if the system is under heavy load.
60 eLow = BS::pr::low, // Thread will be scheduled less often and may not be able to reach the max IPS if the system is under heavy load.
61 eNormal = BS::pr::normal, // Default. Thread will be scheduled normally and should be able to reach the max IPS if the system is not under heavy load.
62 eHigh = BS::pr::high, // Thread will be scheduled more often and should be able to reach the max IPS even if the system is under
63 // heavy load, but it may starve other threads of CPU time.
64 eHighest = BS::pr::highest // Thread will be scheduled as often as possible and should be able to reach the max IPS even if the system is under heavy load.
65 };
66
68 // Declare and define public class methods.
70
78 {
79 // Initialize member variables.
80 m_bStopThreads = false;
81 m_eThreadState = AutonomyThreadState::eStopped;
82 m_nMainThreadMaxIterationPerSecond = 0;
83
84 // Generate a random UUID for this thread.
85 m_szThreadUUID = this->GenerateUUIDV4();
86 }
87
88
99 {
100 // Tell all threads to stop executing user code.
101 m_bStopThreads = true;
102 // Update thread state.
103 m_eThreadState = AutonomyThreadState::eStopping;
104
105 // Pause and clear pool queues.
106 m_thPool.pause();
107 m_thPool.purge();
108 m_thMainThread.pause();
109 m_thMainThread.purge();
110
111 // Wait for all pools to finish.
112 m_thPool.wait();
113 m_thMainThread.wait();
114 // Update thread state.
115 m_eThreadState = AutonomyThreadState::eStopped;
116 }
117
118
134 void Start()
135 {
136 // Tell any open thread to stop.
137 m_bStopThreads = true;
138 // Update thread state.
139 m_eThreadState = AutonomyThreadState::eStopping;
140
141 // Pause queuing of new tasks to the threads, then purge them.
142 m_thPool.pause();
143 m_thPool.purge();
144 m_thMainThread.pause();
145 m_thMainThread.purge();
146
147 // Wait for loop, pool and main thread to join.
148 this->Join();
149
150 // Update thread state.
151 m_eThreadState = AutonomyThreadState::eStarting;
152 // Clear results vector.
153 m_vPoolReturns.clear();
154 // Reset thread stop toggle.
155 m_bStopThreads = false;
156
157 // Submit single task to pool queue and store resulting future. Still using pool, as it's scheduling is more efficient.
158 std::future<void> fuMainReturn = m_thMainThread.submit_task(
159 [this]()
160 {
161 // Note: BS::this_thread::set_os_thread_priority was completely removed in BS v5.1.0.
162 // Priority is kept in memory but not applied to OS threads natively by the library anymore.
163 this->RunThread(m_bStopThreads);
164 });
165
166 // Unpause pool queues.
167 m_thPool.unpause();
168 m_thMainThread.unpause();
169
170 // Block until thread is started or currently stopping if thread start failed.
171 std::unique_lock<std::mutex> lkStartLock(m_muThreadRunningConditionMutex);
172 m_cdThreadRunningCondition.wait(lkStartLock,
173 [this]
174 { return this->m_eThreadState == AutonomyThreadState::eRunning || this->m_eThreadState == AutonomyThreadState::eStopping; });
175 }
176
177
188 {
189 // Signal for any open threads to stop executing,
190 m_bStopThreads = true;
191 // Update thread state.
192 m_eThreadState = AutonomyThreadState::eStopping;
193 }
194
195
203 void Join()
204 {
205 // Wait for pool to finish all tasks.
206 m_thPool.wait();
207 // Wait for main thread to finish.
208 m_thMainThread.wait();
209
210 // Update thread state.
211 m_eThreadState = AutonomyThreadState::eStopped;
212 }
213
214
224 bool Joinable() const
225 { // Check current number of running and queued tasks.
226 return (m_thMainThread.get_tasks_total() <= 0 && m_thPool.get_tasks_total() <= 0);
227 }
228
229
237 AutonomyThreadState GetThreadState() const { return m_eThreadState; }
238
239
247 std::string GetThreadUUID() const { return m_szThreadUUID; }
248
249
257 IPS& GetIPS() { return m_IPS; }
258
259
269 void SetMainThreadPriority(AutonomyThreadPriority ePriority) { m_eMainThreadPriority = static_cast<BS::pr>(ePriority); }
270
271
281 void SetPoolThreadPriority(AutonomyThreadPriority ePriority)
282 {
283 // Update member variable.
284 m_ePoolThreadPriority = static_cast<BS::pr>(ePriority);
285 // Native OS priority extensions were removed in BS v5.1.0, so we just reset thread count.
286 m_thPool.reset(m_thPool.get_thread_count());
287 }
288
289 protected:
291 // Declare protected objects.
293 IPS m_IPS = IPS();
294
296 // Declare and define protected class methods.
298
299
327 void RunPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads = 2, const bool bForceStopCurrentThreads = false)
328 {
329 // Check if the pools need to be resized.
330 if (m_thPool.get_thread_count() != nNumThreads)
331 {
332 // Pause queuing of new tasks to the threads, then purge them.
333 m_thPool.pause();
334 m_thPool.purge();
335 // Wait for open threads to terminate, then resize the pool.
336 m_thPool.reset(nNumThreads);
337 // Unpause queue.
338 m_thPool.unpause();
339
340 // Clear results vector.
341 m_vPoolReturns.clear();
342 }
343 // Check if the current pool tasks should be stopped before queueing more tasks.
344 else if (bForceStopCurrentThreads)
345 {
346 // Pause queuing of new tasks to the threads, then purge them.
347 m_thPool.pause();
348 m_thPool.purge();
349 // Wait for threadpool to join.
350 m_thPool.wait();
351 // Unpause queue.
352 m_thPool.unpause();
353 }
354
355 // Loop nNumThreads times and queue tasks.
356 for (unsigned int nIter = 0; nIter < nNumTasksToQueue; ++nIter)
357 {
358 // Submit single task to pool queue.
359 m_vPoolReturns.emplace_back(m_thPool.submit_task(
360 [this]()
361 {
362 // Run user pool code without lock.
363 this->PooledLinearCode();
364 }));
365 }
366 }
367
368
399 void RunDetachedPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads = 2, const bool bForceStopCurrentThreads = false)
400 {
401 // Check if the pools need to be resized.
402 if (m_thPool.get_thread_count() != nNumThreads)
403 {
404 // Pause queuing of new tasks to the threads, then purge them.
405 m_thPool.pause();
406 m_thPool.purge();
407 // Wait for open threads to terminate, then resize the pool.
408 m_thPool.reset(nNumThreads);
409 // Unpause queue.
410 m_thPool.unpause();
411
412 // Clear results vector.
413 m_vPoolReturns.clear();
414 }
415 // Check if the current pool tasks should be stopped before queueing more tasks.
416 else if (bForceStopCurrentThreads)
417 {
418 // Pause queuing of new tasks to the threads, then purge them.
419 m_thPool.pause();
420 m_thPool.purge();
421 // Wait for threadpool to join.
422 m_thPool.wait();
423 // Unpause queue.
424 m_thPool.unpause();
425 }
426
427 // Loop nNumThreads times and queue tasks.
428 for (unsigned int nIter = 0; nIter < nNumTasksToQueue; ++nIter)
429 {
430 // Push single task to pool queue. No return value no control.
431 m_thPool.detach_task(
432 [this]()
433 {
434 // Run user code without lock.
435 this->PooledLinearCode();
436 });
437 }
438 }
439
440
466 template<typename N, typename F>
467 void ParallelizeLoop(const int nNumThreads, const N tTotalIterations, F&& tLoopFunction)
468 {
469 // Create new thread pool (no specific template flags needed for basic detaching).
470 BS::thread_pool<> m_thLoopPool(nNumThreads);
471
472 m_thLoopPool.detach_blocks(0,
473 tTotalIterations,
474 [&tLoopFunction](const int nStart, const int nEnd)
475 {
476 // Call loop function without lock.
477 tLoopFunction(nStart, nEnd);
478 });
479
480 // Wait for loop to finish.
481 m_thLoopPool.wait();
482 }
483
484
492 void ClearPoolQueue() { m_thPool.purge(); }
493
494
502 void JoinPool() { m_thPool.wait(); }
503
504
514 bool PoolJoinable() const
515 {
516 // Check current number of running and queued tasks.
517 return (m_thPool.get_tasks_total() <= 0);
518 }
519
520
530 void SetMainThreadIPSLimit(int nMaxIterationsPerSecond = 0)
531 {
532 // Assign member variable.
533 m_nMainThreadMaxIterationPerSecond = nMaxIterationsPerSecond;
534 }
535
536
544 int GetPoolNumOfThreads() { return m_thPool.get_thread_count(); }
545
546
554 int GetPoolQueueLength() { return m_thPool.get_tasks_queued(); }
555
556
568 std::vector<T> GetPoolResults()
569 {
570 // Create instance variable.
571 std::vector<T> vResults;
572
573 // Loop the pool futures and get result.
574 for (std::future<T> fResult : m_vPoolReturns)
575 {
576 // Store returned value.
577 vResults.emplace_back(fResult.get());
578 }
579
580 // Clear pool returns member variable.
581 m_vPoolReturns.clear();
582
583 return vResults;
584 }
585
586
595 {
596 // Return member variable value.
597 return m_nMainThreadMaxIterationPerSecond;
598 }
599
600 private:
602 // Declare private class member variables.
604
605 BS::pr m_eMainThreadPriority = BS::pr::normal;
606 BS::pr m_ePoolThreadPriority = BS::pr::normal;
607
608 // BS::thread_pool requires the BS::tp::pause flag to enable pause/unpause
611
612 std::vector<std::future<T>> m_vPoolReturns;
613 std::atomic_bool m_bStopThreads;
614 std::atomic<AutonomyThreadState> m_eThreadState;
615 std::mutex m_muThreadRunningConditionMutex;
616 std::condition_variable m_cdThreadRunningCondition;
617 int m_nMainThreadMaxIterationPerSecond;
618 std::string m_szThreadUUID;
619
621 // Declare and/or define private methods.
623
624 // Declare interface class pure virtual functions. (These must be overriden by inheritor.)
625 virtual void ThreadedContinuousCode() = 0; // This is where user's main single threaded and continuously looping code will go.
626 virtual T PooledLinearCode() = 0; // This is where user's offshoot, highly parallelizable code will go. Helpful for intensive short-lived tasks.
627 // Can be ran from inside the ThreadedContinuousCode() method.
628
629 // Declare and define private interface methods.
630
640 void RunThread(std::atomic_bool& bStopThread)
641 {
642 // Declare instance variables.
643 std::chrono::_V2::system_clock::time_point tmStartTime;
644
645 // Loop until stop flag is set.
646 while (!bStopThread)
647 {
648 // Check if max IPS limit has been set.
649 if (m_nMainThreadMaxIterationPerSecond > 0)
650 {
651 // Get start execution time.
652 tmStartTime = std::chrono::high_resolution_clock::now();
653 }
654
655 // Call method containing user code.
656 this->ThreadedContinuousCode();
657
658 // Check if max IPS limit has been set.
659 if (m_nMainThreadMaxIterationPerSecond > 0)
660 {
661 // Get end execution time.
662 std::chrono::_V2::system_clock::time_point tmEndTime = std::chrono::high_resolution_clock::now();
663 // Get execution time of user code.
664 std::chrono::microseconds tmElapsedTime = std::chrono::duration_cast<std::chrono::microseconds>(tmEndTime - tmStartTime);
665 // Check if the elapsed time is slower than the max iterations per seconds.
666 if (tmElapsedTime.count() < (1.0 / m_nMainThreadMaxIterationPerSecond) * 1000000)
667 {
668 // Calculate the time to wait to stay under IPS cap.
669 int nSleepTime = ((1.0 / m_nMainThreadMaxIterationPerSecond) * 1000000) - tmElapsedTime.count();
670 // Make this thread sleep for the remaining time.
671 std::this_thread::sleep_for(std::chrono::microseconds(nSleepTime));
672 }
673 }
674
675 // Check if thread state needs to be updated.
676 if (m_eThreadState != AutonomyThreadState::eRunning && m_eThreadState != AutonomyThreadState::eStopping)
677 {
678 // Update thread state to running.
679 m_eThreadState = AutonomyThreadState::eRunning;
680 // Notify waiting start method that thread is now running.
681 m_cdThreadRunningCondition.notify_all();
682 }
683
684 // Call iteration per second tracking tick.
685 m_IPS.Tick();
686 }
687
688 // Notify waiting start method that thread is now stopping.
689 m_cdThreadRunningCondition.notify_all();
690 }
691
692
701 std::string GenerateUUIDV4()
702 {
703 static std::random_device stdRandomDevice;
704 static std::mt19937 stdGen(stdRandomDevice());
705 static std::uniform_int_distribution<> stdDist1(0, 15);
706 static std::uniform_int_distribution<> stdDist2(8, 11);
707
708 std::stringstream stdStream;
709 stdStream << std::hex;
710 for (int nIter = 0; nIter < 8; nIter++)
711 {
712 stdStream << stdDist1(stdGen);
713 }
714 stdStream << "-";
715 for (int nIter = 0; nIter < 4; nIter++)
716 {
717 stdStream << stdDist1(stdGen);
718 }
719 stdStream << "-4";
720 for (int nIter = 0; nIter < 3; nIter++)
721 {
722 stdStream << stdDist1(stdGen);
723 }
724 stdStream << "-";
725 stdStream << stdDist2(stdGen);
726 for (int nIter = 0; nIter < 3; nIter++)
727 {
728 stdStream << stdDist1(stdGen);
729 }
730 stdStream << "-";
731 for (int nIter = 0; nIter < 12; nIter++)
732 {
733 stdStream << stdDist1(stdGen);
734 }
735 return stdStream.str();
736 }
737};
738
739#endif
Interface class used to easily multithread a child class.
Definition AutonomyThread.hpp:40
void Join()
Waits for thread to finish executing and then closes thread. This method will block the calling code ...
Definition AutonomyThread.hpp:203
void ParallelizeLoop(const int nNumThreads, const N tTotalIterations, F &&tLoopFunction)
Given a ref-qualified looping function and an arbitrary number of iterations, this method will divide...
Definition AutonomyThread.hpp:467
void RunPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads=2, const bool bForceStopCurrentThreads=false)
When this method is called, it starts/adds tasks to a thread pool that runs nNumTasksToQueue copies o...
Definition AutonomyThread.hpp:327
int GetMainThreadMaxIPS() const
Accessor for the Main Thread Max I P S private member.
Definition AutonomyThread.hpp:594
void SetMainThreadIPSLimit(int nMaxIterationsPerSecond=0)
Mutator for the Main Thread Max I P S private member.
Definition AutonomyThread.hpp:530
AutonomyThread()
Construct a new Autonomy Thread object.
Definition AutonomyThread.hpp:77
int GetPoolNumOfThreads()
Accessor for the Pool Num Of Threads private member.
Definition AutonomyThread.hpp:544
std::vector< T > GetPoolResults()
Accessor for the Pool Results private member. The action of getting results will destroy and remove t...
Definition AutonomyThread.hpp:568
std::string GetThreadUUID() const
Accessor for the Thread U U I D private member.
Definition AutonomyThread.hpp:247
void RequestStop()
Signals threads to stop executing user code, terminate. DOES NOT JOIN. This method will not force the...
Definition AutonomyThread.hpp:187
bool Joinable() const
Check if the code within the thread and all pools created by it are finished executing and the thread...
Definition AutonomyThread.hpp:224
IPS & GetIPS()
Accessor for the Frame I P S private member.
Definition AutonomyThread.hpp:257
int GetPoolQueueLength()
Accessor for the Pool Queue Size private member.
Definition AutonomyThread.hpp:554
void ClearPoolQueue()
Clears any tasks waiting to be ran in the queue, tasks currently running will remain running.
Definition AutonomyThread.hpp:492
void Start()
When this method is called, it starts a new thread that runs the code within the ThreadedContinuousCo...
Definition AutonomyThread.hpp:134
void SetMainThreadPriority(AutonomyThreadPriority ePriority)
Set the OS priority for the main continuous thread.
Definition AutonomyThread.hpp:269
std::string GenerateUUIDV4()
Generates a random UUID v4 string. This is useful for generating unique IDs for files,...
Definition AutonomyThread.hpp:701
void SetPoolThreadPriority(AutonomyThreadPriority ePriority)
Set the OS priority for the highly parallelized pool threads.
Definition AutonomyThread.hpp:281
void RunThread(std::atomic_bool &bStopThread)
This method is ran in a separate thread. It is a middleware between the class member thread and the u...
Definition AutonomyThread.hpp:640
virtual ~AutonomyThread()
Destroy the Autonomy Thread object. If the parent object or main thread is destroyed or exited while ...
Definition AutonomyThread.hpp:98
void RunDetachedPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads=2, const bool bForceStopCurrentThreads=false)
When this method is called, it starts a thread pool full of threads that don't return std::futures (l...
Definition AutonomyThread.hpp:399
bool PoolJoinable() const
Check if the internal pool threads are done executing code and the queue is empty.
Definition AutonomyThread.hpp:514
void JoinPool()
Waits for pool to finish executing tasks. This method will block the calling code until thread is fin...
Definition AutonomyThread.hpp:502
AutonomyThreadState GetThreadState() const
Accessor for the Threads State private member.
Definition AutonomyThread.hpp:237
A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread pool class.
Definition BS_thread_pool.hpp:1429
void detach_blocks(const T1 first_index, const T2 index_after_last, F &&block, const std::size_t num_blocks=0, const priority_t priority=0)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:1531
std::future< R > submit_task(F &&task, const priority_t priority=0)
Submit a function with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:1939
BS_THREAD_POOL_IF_PAUSE_ENABLED void pause()
Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue,...
Definition BS_thread_pool.hpp:1726
void purge()
Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected,...
Definition BS_thread_pool.hpp:1735
std::size_t get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition BS_thread_pool.hpp:1659
std::size_t get_tasks_total() const
Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread....
Definition BS_thread_pool.hpp:1681
std::size_t get_thread_count() const noexcept
Get the number of threads in the pool.
Definition BS_thread_pool.hpp:1692
BS_THREAD_POOL_IF_PAUSE_ENABLED void unpause()
Unpause the pool. The workers will resume retrieving new tasks out of the queue. Only enabled if the ...
Definition BS_thread_pool.hpp:1950
void detach_task(F &&task, const priority_t priority=0)
Submit a function with no arguments and no return value into the task queue, with the specified prior...
Definition BS_thread_pool.hpp:1627
void reset()
Reset the pool with the default number of threads (as if constructed with the default constructor)....
Definition BS_thread_pool.hpp:1744
void wait()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition BS_thread_pool.hpp:1964
This util class provides an easy way to keep track of iterations per second for any body of code.
Definition IPS.hpp:30
void Tick()
This method is used to update the iterations per second counter and recalculate all of the IPS metric...
Definition IPS.hpp:138
pr
An enum containing some pre-defined priorities for convenience.
Definition BS_thread_pool.hpp:398