ctrl-utils
atomic_queue.h
1 
10 #ifndef CTRL_UTILS_ATOMIC_QUEUE_H_
11 #define CTRL_UTILS_ATOMIC_QUEUE_H_
12 
13 #include <algorithm> // std::swap
14 #include <condition_variable> // std::condition_variable
15 #include <csignal> // std::sig_atomic_t
16 #include <exception> // std::runtime_error
17 #include <mutex> // std::mutex, std::unique_lock
18 #include <queue> // std::queue
19 #include <vector> // std::vector
20 
21 namespace ctrl_utils {
22 
23 template <typename T>
24 class AtomicQueue {
25  public:
26  virtual ~AtomicQueue() { Terminate(); }
27 
31  T Pop() {
32  T value;
33  Pop(value);
34  return value;
35  }
36 
40  void Pop(T& value) {
41  std::unique_lock<std::mutex> lock(m_);
42  cv_.wait(lock, [this]() { return !queue_.empty() || terminate_; });
43  if (terminate_) return;
44 
45  std::swap(value, queue_.front());
46  queue_.pop();
47  }
48 
52  void Push(const T& item) {
53  {
54  std::unique_lock<std::mutex> lock(m_);
55  queue_.push(item);
56  }
57  cv_.notify_one();
58  }
59 
63  void Push(T&& item) {
64  {
65  std::unique_lock<std::mutex> lock(m_);
66  queue_.push(std::move(item));
67  }
68  cv_.notify_one();
69  }
70 
74  template <class... Args>
75  void Emplace(Args&&... args) {
76  {
77  std::unique_lock<std::mutex> lock(m_);
78  queue_.emplace(args...);
79  }
80  cv_.notify_one();
81  }
82 
88  void Terminate() {
89  terminate_ = true;
90  cv_.notify_all();
91  }
92 
93  protected:
94  std::queue<T> queue_;
95  std::mutex m_;
96  std::condition_variable cv_;
97  std::sig_atomic_t terminate_ = false;
98 };
99 
103 template <typename T>
105  public:
106  explicit AtomicBuffer(size_t size) : queue_(size) {}
107 
108  virtual ~AtomicBuffer() { Terminate(); }
109 
113  T Pop() {
114  T value;
115  Pop(value);
116  return value;
117  }
118 
122  void Pop(T& value) {
123  std::unique_lock<std::mutex> lock(m_);
124  cv_.wait(lock, [this]() { return num_unread_ > 0 || terminate_; });
125  if (terminate_) return;
126 
127  std::swap(value, queue_[idx_read_]);
128  IncrementLoop(idx_read_);
129  --num_unread_;
130  }
131 
135  void Push(const T& item) {
136  {
137  std::unique_lock<std::mutex> lock(m_);
138  queue_[idx_write_] = item;
139  IncrementWrite();
140  }
141  cv_.notify_one();
142  }
143 
147  void Push(T&& item) {
148  {
149  std::unique_lock<std::mutex> lock(m_);
150  std::swap(queue_[idx_write_], item);
151  IncrementWrite();
152  }
153  cv_.notify_one();
154  }
155 
159  template <class... Args>
160  void Emplace(Args&&... args) {
161  {
162  std::unique_lock<std::mutex> lock(m_);
163  queue_[idx_write_] = T(args...);
164  IncrementWrite();
165  }
166  cv_.notify_one();
167  }
168 
174  void Terminate() {
175  terminate_ = true;
176  cv_.notify_all();
177  }
178 
179  protected:
180  void IncrementLoop(size_t& idx) {
181  if (++idx >= queue_.size()) idx = 0;
182  }
183  void IncrementWrite() {
184  if (num_unread_ >= queue_.size()) {
185  IncrementLoop(idx_read_);
186  } else {
187  ++num_unread_;
188  }
189  IncrementLoop(idx_write_);
190  }
191  std::vector<T> queue_;
192  std::mutex m_;
193  std::condition_variable cv_;
194  std::sig_atomic_t terminate_ = false;
195 
196  size_t num_unread_ = 0;
197  size_t idx_write_ = 0;
198  size_t idx_read_ = 0;
199 };
200 
201 } // namespace ctrl_utils
202 
203 #endif // CTRL_UTILS_ATOMIC_QUEUE_H_
Definition: argparse.h:83
Definition: atomic_queue.h:104
T Pop()
Definition: atomic_queue.h:113
void Push(const T &item)
Definition: atomic_queue.h:135
void Push(T &&item)
Definition: atomic_queue.h:147
void Emplace(Args &&... args)
Definition: atomic_queue.h:160
void Pop(T &value)
Definition: atomic_queue.h:122
void Terminate()
Definition: atomic_queue.h:174
Definition: atomic_queue.h:24
void Pop(T &value)
Definition: atomic_queue.h:40
T Pop()
Definition: atomic_queue.h:31
void Emplace(Args &&... args)
Definition: atomic_queue.h:75
void Push(const T &item)
Definition: atomic_queue.h:52
void Push(T &&item)
Definition: atomic_queue.h:63
void Terminate()
Definition: atomic_queue.h:88
Definition: ctrl_utils.cc:18