ctrl-utils
thread_pool.h
1 
10 #ifndef CTRL_UTILS_THREAD_POOL_H_
11 #define CTRL_UTILS_THREAD_POOL_H_
12 
13 #include <ctrl_utils/atomic_queue.h>
14 
15 #include <functional> // std::function
16 #include <future> // std::future, std::promise
17 #include <thread> // std::thread
18 #include <utility> // std::make_pair, std::move, std::pair
19 #include <vector> // std::vector
20 
21 namespace ctrl_utils {
22 
23 template <typename T>
24 class ThreadPool {
25  public:
32  ThreadPool(size_t num_threads) {
33  if (num_threads == 0) {
34  num_threads = std::thread::hardware_concurrency();
35  }
36  threads_.reserve(num_threads);
37  for (size_t i = 0; i < num_threads; i++) {
38  threads_.emplace_back([this]() { InfiniteLoop(); });
39  // threads_.back().detach();
40  }
41  }
42 
46  virtual ~ThreadPool() {
47  Terminate();
48  for (std::thread& t : threads_) {
49  if (t.joinable()) t.join();
50  }
51  }
52 
59  std::future<T> Submit(std::function<T()>&& job) {
60  auto promise = std::make_shared<std::promise<T>>();
61  jobs_.Push(std::make_pair(std::move(job), promise));
62  return promise->get_future();
63  }
64 
65  std::future<T> Submit(std::function<T()>& job) {
66  auto promise = std::make_shared<std::promise<T>>();
67  jobs_.Push(std::make_pair(job, promise));
68  return promise->get_future();
69  }
70 
76  void Terminate() {
77  terminate_ = true;
78  jobs_.Terminate();
79  }
80 
81  private:
82  using Promise = std::shared_ptr<std::promise<T>>;
83 
87  void InfiniteLoop() {
88  while (!terminate_) {
89  std::pair<std::function<T()>, Promise> job_promise = jobs_.Pop();
90  std::function<T()>& job = job_promise.first;
91  Promise& promise = job_promise.second;
92 
93  // If terminated, raise an exception instead of processing.
94  if (terminate_) {
95  if (!promise) return;
96  promise->set_exception(std::make_exception_ptr(
97  std::runtime_error("ThreadPool::Terminate() called.")));
98  return;
99  }
100 
101  // Run the job and return the promised value.
102  ExecuteJob(promise, job);
103  }
104  }
105 
109  void ExecuteJob(Promise& promise, std::function<T()>& job) {
110  promise->set_value(job());
111  }
112 
113  std::vector<std::thread> threads_;
114 
115  AtomicQueue<std::pair<std::function<T()>, Promise>> jobs_;
116 
117  std::sig_atomic_t terminate_ = false;
118 };
119 
123 template <>
124 void ThreadPool<void>::ExecuteJob(Promise& promise,
125  std::function<void()>& job) {
126  job();
127  promise->set_value();
128 }
129 
130 } // namespace ctrl_utils
131 
132 #endif // CTRL_UTILS_THREAD_POOL_H_
T Pop()
Definition: atomic_queue.h:31
void Push(const T &item)
Definition: atomic_queue.h:52
void Terminate()
Definition: atomic_queue.h:88
Definition: thread_pool.h:24
std::future< T > Submit(std::function< T()> &&job)
Definition: thread_pool.h:59
void Terminate()
Definition: thread_pool.h:76
virtual ~ThreadPool()
Definition: thread_pool.h:46
ThreadPool(size_t num_threads)
Definition: thread_pool.h:32
Definition: ctrl_utils.cc:18