RunQueue.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
19 // Operations on front of the queue must be done by a single thread (owner),
20 // operations on back of the queue can be done by multiple threads concurrently.
21 //
22 // Algorithm outline:
23 // All remote threads operating on the queue back are serialized by a mutex.
24 // This ensures that at most two threads access state: owner and one remote
25 // thread (Size aside). The algorithm ensures that the occupied region of the
26 // underlying array is logically continuous (can wraparound, but no stray
27 // occupied elements). Owner operates on one end of this region, remote thread
28 // operates on the other end. Synchronization between these threads
29 // (potential consumption of the last element and take up of the last empty
30 // element) happens by means of state variable in each element. States are:
31 // empty, busy (in process of insertion of removal) and ready. Threads claim
32 // elements (empty->busy and ready->busy transitions) by means of a CAS
33 // operation. The finishing transition (busy->empty and busy->ready) are done
34 // with plain store as the element is exclusively owned by the current thread.
35 //
36 // Note: we could permit only pointers as elements, then we would not need
37 // separate state variable as null/non-null pointer value would serve as state,
38 // but that would require malloc/free per operation for large, complex values
39 // (and this is designed to store std::function<()>).
40 template <typename Work, unsigned kSize>
41 class RunQueue {
42  public:
43  RunQueue() : front_(0), back_(0) {
44  // require power-of-two for fast masking
45  eigen_plain_assert((kSize & (kSize - 1)) == 0);
46  eigen_plain_assert(kSize > 2); // why would you do this?
47  eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
48  for (unsigned i = 0; i < kSize; i++) array_[i].state.store(kEmpty, std::memory_order_relaxed);
49  }
50 
52 
53  // PushFront inserts w at the beginning of the queue.
54  // If queue is full returns w, otherwise returns default-constructed Work.
55  Work PushFront(Work w) {
56  unsigned front = front_.load(std::memory_order_relaxed);
57  Elem* e = &array_[front & kMask];
58  uint8_t s = e->state.load(std::memory_order_relaxed);
59  if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w;
60  front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
61  e->w = std::move(w);
62  e->state.store(kReady, std::memory_order_release);
63  return Work();
64  }
65 
66  // PopFront removes and returns the first element in the queue.
67  // If the queue was empty returns default-constructed Work.
68  Work PopFront() {
69  unsigned front = front_.load(std::memory_order_relaxed);
70  Elem* e = &array_[(front - 1) & kMask];
71  uint8_t s = e->state.load(std::memory_order_relaxed);
72  if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work();
73  Work w = std::move(e->w);
74  e->state.store(kEmpty, std::memory_order_release);
75  front = ((front - 1) & kMask2) | (front & ~kMask2);
76  front_.store(front, std::memory_order_relaxed);
77  return w;
78  }
79 
80  // PushBack adds w at the end of the queue.
81  // If queue is full returns w, otherwise returns default-constructed Work.
82  Work PushBack(Work w) {
83  EIGEN_MUTEX_LOCK lock(mutex_);
84  unsigned back = back_.load(std::memory_order_relaxed);
85  Elem* e = &array_[(back - 1) & kMask];
86  uint8_t s = e->state.load(std::memory_order_relaxed);
87  if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w;
88  back = ((back - 1) & kMask2) | (back & ~kMask2);
89  back_.store(back, std::memory_order_relaxed);
90  e->w = std::move(w);
91  e->state.store(kReady, std::memory_order_release);
92  return Work();
93  }
94 
95  // PopBack removes and returns the last elements in the queue.
96  Work PopBack() {
97  if (Empty()) return Work();
98  EIGEN_MUTEX_LOCK lock(mutex_);
99  unsigned back = back_.load(std::memory_order_relaxed);
100  Elem* e = &array_[back & kMask];
101  uint8_t s = e->state.load(std::memory_order_relaxed);
102  if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work();
103  Work w = std::move(e->w);
104  e->state.store(kEmpty, std::memory_order_release);
105  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
106  return w;
107  }
108 
109  // PopBackHalf removes and returns half last elements in the queue.
110  // Returns number of elements removed.
111  unsigned PopBackHalf(std::vector<Work>* result) {
112  if (Empty()) return 0;
113  EIGEN_MUTEX_LOCK lock(mutex_);
114  unsigned back = back_.load(std::memory_order_relaxed);
115  unsigned size = Size();
116  unsigned mid = back;
117  if (size > 1) mid = back + (size - 1) / 2;
118  unsigned n = 0;
119  unsigned start = 0;
120  for (; static_cast<int>(mid - back) >= 0; mid--) {
121  Elem* e = &array_[mid & kMask];
122  uint8_t s = e->state.load(std::memory_order_relaxed);
123  if (n == 0) {
124  if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) continue;
125  start = mid;
126  } else {
127  // Note: no need to store temporal kBusy, we exclusively own these
128  // elements.
130  }
131  result->push_back(std::move(e->w));
132  e->state.store(kEmpty, std::memory_order_release);
133  n++;
134  }
135  if (n != 0) back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
136  return n;
137  }
138 
139  // Size returns current queue size.
140  // Can be called by any thread at any time.
141  unsigned Size() const { return SizeOrNotEmpty<true>(); }
142 
143  // Empty tests whether container is empty.
144  // Can be called by any thread at any time.
145  bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
146 
147  // Delete all the elements from the queue.
148  void Flush() {
149  while (!Empty()) {
150  PopFront();
151  }
152  }
153 
154  private:
155  static const unsigned kMask = kSize - 1;
156  static const unsigned kMask2 = (kSize << 1) - 1;
157 
158  enum State {
162  };
163 
164  struct Elem {
165  std::atomic<uint8_t> state;
166  Work w;
167  };
168 
169  // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
170  // front/back, respectively. The remaining bits contain modification counters
171  // that are incremented on Push operations. This allows us to (1) distinguish
172  // between empty and full conditions (if we would use log(kSize) bits for
173  // position, these conditions would be indistinguishable); (2) obtain
174  // consistent snapshot of front_/back_ for Size operation using the
175  // modification counters.
178  EIGEN_MUTEX mutex_; // guards `PushBack` and `PopBack` (accesses `back_`)
179 
181 
182  // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
183  // only whether the size is 0 is guaranteed to be correct.
184  // Can be called by any thread at any time.
185  template <bool NeedSizeEstimate>
186  unsigned SizeOrNotEmpty() const {
187  // Emptiness plays critical role in thread pool blocking. So we go to great
188  // effort to not produce false positives (claim non-empty queue as empty).
189  unsigned front = front_.load(std::memory_order_acquire);
190  for (;;) {
191  // Capture a consistent snapshot of front/tail.
192  unsigned back = back_.load(std::memory_order_acquire);
193  unsigned front1 = front_.load(std::memory_order_relaxed);
194  if (front != front1) {
195  front = front1;
196  std::atomic_thread_fence(std::memory_order_acquire);
197  continue;
198  }
199  if (NeedSizeEstimate) {
200  return CalculateSize(front, back);
201  } else {
202  // This value will be 0 if the queue is empty, and undefined otherwise.
203  unsigned maybe_zero = ((front ^ back) & kMask2);
204  // Queue size estimate must agree with maybe zero check on the queue
205  // empty/non-empty state.
206  eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
207  return maybe_zero;
208  }
209  }
210  }
211 
212  EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const {
213  int size = (front & kMask2) - (back & kMask2);
214  // Fix overflow.
215  if (EIGEN_PREDICT_FALSE(size < 0)) size += 2 * kSize;
216  // Order of modification in push/pop is crafted to make the queue look
217  // larger than it is during concurrent modifications. E.g. push can
218  // increment size before the corresponding pop has decremented it.
219  // So the computed size can be up to kSize + 1, fix it.
220  if (EIGEN_PREDICT_FALSE(size > static_cast<int>(kSize))) size = kSize;
221  return static_cast<unsigned>(size);
222  }
223 
224  RunQueue(const RunQueue&) = delete;
225  void operator=(const RunQueue&) = delete;
226 };
227 
228 } // namespace Eigen
229 
230 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
#define eigen_plain_assert(condition)
Definition: Assert.h:148
int i
Definition: BiCGSTAB_step_by_step.cpp:9
const unsigned n
Definition: CG3DPackingUnitTest.cpp:11
#define EIGEN_ALIGN_TO_AVOID_FALSE_SHARING
Definition: ConfigureVectorization.h:48
Array< double, 1, 3 > e(1./3., 0.5, 2.)
#define EIGEN_ALWAYS_INLINE
Definition: Macros.h:845
#define EIGEN_PREDICT_FALSE(x)
Definition: Macros.h:1179
#define eigen_assert(x)
Definition: Macros.h:910
RowVector3d w
Definition: Matrix_resize_int.cpp:3
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Definition: RunQueue.h:41
void operator=(const RunQueue &)=delete
EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic< unsigned > back_
Definition: RunQueue.h:177
Work PushBack(Work w)
Definition: RunQueue.h:82
EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic< unsigned > front_
Definition: RunQueue.h:176
unsigned SizeOrNotEmpty() const
Definition: RunQueue.h:186
RunQueue(const RunQueue &)=delete
unsigned PopBackHalf(std::vector< Work > *result)
Definition: RunQueue.h:111
Work PopBack()
Definition: RunQueue.h:96
EIGEN_MUTEX mutex_
Definition: RunQueue.h:178
Work PopFront()
Definition: RunQueue.h:68
EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const
Definition: RunQueue.h:212
EIGEN_ALIGN_TO_AVOID_FALSE_SHARING Elem array_[kSize]
Definition: RunQueue.h:180
void Flush()
Definition: RunQueue.h:148
Work PushFront(Work w)
Definition: RunQueue.h:55
unsigned Size() const
Definition: RunQueue.h:141
static const unsigned kMask2
Definition: RunQueue.h:156
~RunQueue()
Definition: RunQueue.h:51
bool Empty() const
Definition: RunQueue.h:145
State
Definition: RunQueue.h:158
@ kReady
Definition: RunQueue.h:161
@ kEmpty
Definition: RunQueue.h:159
@ kBusy
Definition: RunQueue.h:160
RunQueue()
Definition: RunQueue.h:43
static const unsigned kMask
Definition: RunQueue.h:155
RealScalar s
Definition: level1_cplx_impl.h:130
std::uint8_t uint8_t
Definition: Meta.h:36
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
void start(const unsigned &i)
(Re-)start i-th timer
Definition: oomph_utilities.cc:243
Definition: RunQueue.h:164
std::atomic< uint8_t > state
Definition: RunQueue.h:165
Work w
Definition: RunQueue.h:166