10 #ifndef CTRL_UTILS_THREAD_POOL_H_
11 #define CTRL_UTILS_THREAD_POOL_H_
13 #include <ctrl_utils/atomic_queue.h>
33 if (num_threads == 0) {
34 num_threads = std::thread::hardware_concurrency();
36 threads_.reserve(num_threads);
37 for (
size_t i = 0; i < num_threads; i++) {
38 threads_.emplace_back([
this]() { InfiniteLoop(); });
48 for (std::thread& t : threads_) {
49 if (t.joinable()) t.join();
59 std::future<T>
Submit(std::function<T()>&& job) {
60 auto promise = std::make_shared<std::promise<T>>();
61 jobs_.
Push(std::make_pair(std::move(job), promise));
62 return promise->get_future();
65 std::future<T>
Submit(std::function<T()>& job) {
66 auto promise = std::make_shared<std::promise<T>>();
67 jobs_.
Push(std::make_pair(job, promise));
68 return promise->get_future();
82 using Promise = std::shared_ptr<std::promise<T>>;
89 std::pair<std::function<T()>, Promise> job_promise = jobs_.
Pop();
90 std::function<T()>& job = job_promise.first;
91 Promise& promise = job_promise.second;
96 promise->set_exception(std::make_exception_ptr(
97 std::runtime_error(
"ThreadPool::Terminate() called.")));
102 ExecuteJob(promise, job);
109 void ExecuteJob(Promise& promise, std::function<T()>& job) {
110 promise->set_value(job());
113 std::vector<std::thread> threads_;
115 AtomicQueue<std::pair<std::function<T()>, Promise>> jobs_;
117 std::sig_atomic_t terminate_ =
false;
124 void ThreadPool<void>::ExecuteJob(Promise& promise,
125 std::function<
void()>& job) {
127 promise->set_value();
T Pop()
Definition: atomic_queue.h:31
void Push(const T &item)
Definition: atomic_queue.h:52
void Terminate()
Definition: atomic_queue.h:88
Definition: thread_pool.h:24
std::future< T > Submit(std::function< T()> &&job)
Definition: thread_pool.h:59
void Terminate()
Definition: thread_pool.h:76
virtual ~ThreadPool()
Definition: thread_pool.h:46
ThreadPool(size_t num_threads)
Definition: thread_pool.h:32
Definition: ctrl_utils.cc:18