EventCount.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H
11 #define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 // EventCount allows to wait for arbitrary predicates in non-blocking
19 // algorithms. Think of condition variable, but wait predicate does not need to
20 // be protected by a mutex. Usage:
21 // Waiting thread does:
22 //
23 // if (predicate)
24 // return act();
25 // EventCount::Waiter& w = waiters[my_index];
26 // ec.Prewait(&w);
27 // if (predicate) {
28 // ec.CancelWait(&w);
29 // return act();
30 // }
31 // ec.CommitWait(&w);
32 //
33 // Notifying thread does:
34 //
35 // predicate = true;
36 // ec.Notify(true);
37 //
38 // Notify is cheap if there are no waiting threads. Prewait/CommitWait are not
39 // cheap, but they are executed only if the preceding predicate check has
40 // failed.
41 //
42 // Algorithm outline:
43 // There are two main variables: predicate (managed by user) and state_.
44 // Operation closely resembles Dekker mutual algorithm:
45 // https://en.wikipedia.org/wiki/Dekker%27s_algorithm
46 // Waiting thread sets state_ then checks predicate, Notifying thread sets
47 // predicate then checks state_. Due to seq_cst fences in between these
48 // operations it is guaranteed than either waiter will see predicate change
49 // and won't block, or notifying thread will see state_ change and will unblock
50 // the waiter, or both. But it can't happen that both threads don't see each
51 // other changes, which would lead to deadlock.
52 class EventCount {
53  public:
54  class Waiter;
55 
57  eigen_plain_assert(waiters.size() < (1 << kWaiterBits) - 1);
58  }
59 
60  EventCount(const EventCount&) = delete;
61  void operator=(const EventCount&) = delete;
62 
64  // Ensure there are no waiters.
66  }
67 
68  // Prewait prepares for waiting.
69  // After calling Prewait, the thread must re-check the wait predicate
70  // and then call either CancelWait or CommitWait.
71  void Prewait() {
72  uint64_t state = state_.load(std::memory_order_relaxed);
73  for (;;) {
74  CheckState(state);
75  uint64_t newstate = state + kWaiterInc;
76  CheckState(newstate);
77  if (state_.compare_exchange_weak(state, newstate, std::memory_order_seq_cst)) return;
78  }
79  }
80 
81  // CommitWait commits waiting after Prewait.
82  void CommitWait(Waiter* w) {
83  eigen_plain_assert((w->epoch & ~kEpochMask) == 0);
84  w->state = Waiter::kNotSignaled;
85  const uint64_t me = (w - &waiters_[0]) | w->epoch;
86  uint64_t state = state_.load(std::memory_order_seq_cst);
87  for (;;) {
88  CheckState(state, true);
89  uint64_t newstate;
90  if ((state & kSignalMask) != 0) {
91  // Consume the signal and return immediately.
92  newstate = state - kWaiterInc - kSignalInc;
93  } else {
94  // Remove this thread from pre-wait counter and add to the waiter stack.
95  newstate = ((state & kWaiterMask) - kWaiterInc) | me;
96  w->next.store(state & (kStackMask | kEpochMask), std::memory_order_relaxed);
97  }
98  CheckState(newstate);
99  if (state_.compare_exchange_weak(state, newstate, std::memory_order_acq_rel)) {
100  if ((state & kSignalMask) == 0) {
101  w->epoch += kEpochInc;
102  Park(w);
103  }
104  return;
105  }
106  }
107  }
108 
109  // CancelWait cancels effects of the previous Prewait call.
110  void CancelWait() {
111  uint64_t state = state_.load(std::memory_order_relaxed);
112  for (;;) {
113  CheckState(state, true);
114  uint64_t newstate = state - kWaiterInc;
115  // We don't know if the thread was also notified or not,
116  // so we should not consume a signal unconditionally.
117  // Only if number of waiters is equal to number of signals,
118  // we know that the thread was notified and we must take away the signal.
119  if (((state & kWaiterMask) >> kWaiterShift) == ((state & kSignalMask) >> kSignalShift)) newstate -= kSignalInc;
120  CheckState(newstate);
121  if (state_.compare_exchange_weak(state, newstate, std::memory_order_acq_rel)) return;
122  }
123  }
124 
125  // Notify wakes one or all waiting threads.
126  // Must be called after changing the associated wait predicate.
127  void Notify(bool notifyAll) {
128  std::atomic_thread_fence(std::memory_order_seq_cst);
129  uint64_t state = state_.load(std::memory_order_acquire);
130  for (;;) {
131  CheckState(state);
132  const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
133  const uint64_t signals = (state & kSignalMask) >> kSignalShift;
134  // Easy case: no waiters.
135  if ((state & kStackMask) == kStackMask && waiters == signals) return;
136  uint64_t newstate;
137  if (notifyAll) {
138  // Empty wait stack and set signal to number of pre-wait threads.
139  newstate = (state & kWaiterMask) | (waiters << kSignalShift) | kStackMask;
140  } else if (signals < waiters) {
141  // There is a thread in pre-wait state, unblock it.
142  newstate = state + kSignalInc;
143  } else {
144  // Pop a waiter from list and unpark it.
145  Waiter* w = &waiters_[state & kStackMask];
146  uint64_t next = w->next.load(std::memory_order_relaxed);
147  newstate = (state & (kWaiterMask | kSignalMask)) | next;
148  }
149  CheckState(newstate);
150  if (state_.compare_exchange_weak(state, newstate, std::memory_order_acq_rel)) {
151  if (!notifyAll && (signals < waiters)) return; // unblocked pre-wait thread
152  if ((state & kStackMask) == kStackMask) return;
153  Waiter* w = &waiters_[state & kStackMask];
154  if (!notifyAll) w->next.store(kStackMask, std::memory_order_relaxed);
155  Unpark(w);
156  return;
157  }
158  }
159  }
160 
161  private:
162  // State_ layout:
163  // - low kWaiterBits is a stack of waiters committed wait
164  // (indexes in waiters_ array are used as stack elements,
165  // kStackMask means empty stack).
166  // - next kWaiterBits is count of waiters in prewait state.
167  // - next kWaiterBits is count of pending signals.
168  // - remaining bits are ABA counter for the stack.
169  // (stored in Waiter node and incremented on push).
170  static const uint64_t kWaiterBits = 14;
171  static const uint64_t kStackMask = (1ull << kWaiterBits) - 1;
173  static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1) << kWaiterShift;
174  static const uint64_t kWaiterInc = 1ull << kWaiterShift;
175  static const uint64_t kSignalShift = 2 * kWaiterBits;
176  static const uint64_t kSignalMask = ((1ull << kWaiterBits) - 1) << kSignalShift;
177  static const uint64_t kSignalInc = 1ull << kSignalShift;
178  static const uint64_t kEpochShift = 3 * kWaiterBits;
179  static const uint64_t kEpochBits = 64 - kEpochShift;
180  static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
181  static const uint64_t kEpochInc = 1ull << kEpochShift;
182 
183  public:
184  class Waiter {
185  friend class EventCount;
186 
187  enum State {
191  };
192 
194  EIGEN_MUTEX mu;
195  EIGEN_CONDVAR cv;
197  unsigned state{kNotSignaled};
198  };
199 
200  private:
201  static void CheckState(uint64_t state, bool waiter = false) {
202  static_assert(kEpochBits >= 20, "not enough bits to prevent ABA problem");
203  const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
204  const uint64_t signals = (state & kSignalMask) >> kSignalShift;
205  eigen_plain_assert(waiters >= signals);
206  eigen_plain_assert(waiters < (1 << kWaiterBits) - 1);
207  eigen_plain_assert(!waiter || waiters > 0);
208  (void)waiters;
209  (void)signals;
210  }
211 
212  void Park(Waiter* w) {
213  EIGEN_MUTEX_LOCK lock(w->mu);
214  while (w->state != Waiter::kSignaled) {
215  w->state = Waiter::kWaiting;
216  w->cv.wait(lock);
217  }
218  }
219 
220  void Unpark(Waiter* w) {
221  for (Waiter* next; w; w = next) {
222  uint64_t wnext = w->next.load(std::memory_order_relaxed) & kStackMask;
223  next = wnext == kStackMask ? nullptr : &waiters_[internal::convert_index<size_t>(wnext)];
224  unsigned state;
225  {
226  EIGEN_MUTEX_LOCK lock(w->mu);
227  state = w->state;
228  w->state = Waiter::kSignaled;
229  }
230  // Avoid notifying if it wasn't waiting.
231  if (state == Waiter::kWaiting) w->cv.notify_one();
232  }
233  }
234 
235  std::atomic<uint64_t> state_;
237 };
238 
239 } // namespace Eigen
240 
241 #endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H
#define eigen_plain_assert(condition)
Definition: Assert.h:148
#define EIGEN_ALIGN_TO_AVOID_FALSE_SHARING
Definition: ConfigureVectorization.h:48
RowVector3d w
Definition: Matrix_resize_int.cpp:3
Definition: EventCount.h:184
uint64_t epoch
Definition: EventCount.h:196
EIGEN_MUTEX mu
Definition: EventCount.h:194
State
Definition: EventCount.h:187
@ kWaiting
Definition: EventCount.h:189
@ kNotSignaled
Definition: EventCount.h:188
@ kSignaled
Definition: EventCount.h:190
EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic< uint64_t > next
Definition: EventCount.h:193
unsigned state
Definition: EventCount.h:197
EIGEN_CONDVAR cv
Definition: EventCount.h:195
Definition: EventCount.h:52
MaxSizeVector< Waiter > & waiters_
Definition: EventCount.h:236
EventCount(MaxSizeVector< Waiter > &waiters)
Definition: EventCount.h:56
std::atomic< uint64_t > state_
Definition: EventCount.h:235
static const uint64_t kEpochMask
Definition: EventCount.h:180
static const uint64_t kStackMask
Definition: EventCount.h:171
static const uint64_t kSignalInc
Definition: EventCount.h:177
void CommitWait(Waiter *w)
Definition: EventCount.h:82
static const uint64_t kEpochShift
Definition: EventCount.h:178
static const uint64_t kWaiterMask
Definition: EventCount.h:173
void operator=(const EventCount &)=delete
void Notify(bool notifyAll)
Definition: EventCount.h:127
void Unpark(Waiter *w)
Definition: EventCount.h:220
static const uint64_t kWaiterInc
Definition: EventCount.h:174
static const uint64_t kEpochBits
Definition: EventCount.h:179
void CancelWait()
Definition: EventCount.h:110
static const uint64_t kWaiterShift
Definition: EventCount.h:172
static void CheckState(uint64_t state, bool waiter=false)
Definition: EventCount.h:201
EventCount(const EventCount &)=delete
static const uint64_t kSignalMask
Definition: EventCount.h:176
static const uint64_t kEpochInc
Definition: EventCount.h:181
void Park(Waiter *w)
Definition: EventCount.h:212
static const uint64_t kWaiterBits
Definition: EventCount.h:170
static const uint64_t kSignalShift
Definition: EventCount.h:175
void Prewait()
Definition: EventCount.h:71
~EventCount()
Definition: EventCount.h:63
The MaxSizeVector class.
Definition: MaxSizeVector.h:31
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t size() const
Definition: MaxSizeVector.h:115
std::uint64_t uint64_t
Definition: Meta.h:42
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70