NonBlockingThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 template <typename Environment>
20  public:
21  typedef typename Environment::EnvThread Thread;
22  typedef typename Environment::Task Task;
24 
25  struct PerThread {
26  constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
27  ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
28  uint64_t rand; // Random generator state.
29  int thread_id; // Worker thread index in pool.
30  };
31 
32  struct ThreadData {
33  constexpr ThreadData() : thread(), steal_partition(0), queue() {}
34  std::unique_ptr<Thread> thread;
35  std::atomic<unsigned> steal_partition;
37  };
38 
39  ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {}
40 
41  ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment())
42  : env_(env),
43  num_threads_(num_threads),
44  allow_spinning_(allow_spinning),
46  // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is proportional to num_threads_ and
47  // we assume that new work is scheduled at a constant rate, so we divide `kSpintCount` by number of
48  // threads and number of spinning threads. The constant was picked based on a fair dice roll, tune it.
49  allow_spinning && num_threads > 0 ? kSpinCount / kMaxSpinningThreads / num_threads : 0),
50  thread_data_(num_threads),
51  all_coprimes_(num_threads),
52  waiters_(num_threads),
54  spinning_state_(0),
55  blocked_(0),
56  done_(false),
57  cancelled_(false),
58  ec_(waiters_) {
59  waiters_.resize(num_threads_);
60  // Calculate coprimes of all numbers [1, num_threads].
61  // Coprimes are used for random walks over all threads in Steal
62  // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
63  // a random starting thread index t and calculate num_threads - 1 subsequent
64  // indices as (t + coprime) % num_threads, we will cover all threads without
65  // repetitions (effectively getting a presudo-random permutation of thread
66  // indices).
68  for (int i = 1; i <= num_threads_; ++i) {
69  all_coprimes_.emplace_back(i);
71  }
72 #ifndef EIGEN_THREAD_LOCAL
74 #endif
75  thread_data_.resize(num_threads_);
76  for (int i = 0; i < num_threads_; i++) {
78  thread_data_[i].thread.reset(env_.CreateThread([this, i]() { WorkerLoop(i); }));
79  }
80 #ifndef EIGEN_THREAD_LOCAL
81  // Wait for workers to initialize per_thread_map_. Otherwise we might race
82  // with them in Schedule or CurrentThreadId.
83  init_barrier_->Wait();
84 #endif
85  }
86 
88  done_ = true;
89 
90  // Now if all threads block without work, they will start exiting.
91  // But note that threads can continue to work arbitrary long,
92  // block, submit new work, unblock and otherwise live full life.
93  if (!cancelled_) {
94  ec_.Notify(true);
95  } else {
96  // Since we were cancelled, there might be entries in the queues.
97  // Empty them to prevent their destructor from asserting.
98  for (size_t i = 0; i < thread_data_.size(); i++) {
99  thread_data_[i].queue.Flush();
100  }
101  }
102  // Join threads explicitly (by destroying) to avoid destruction order within
103  // this class.
104  for (size_t i = 0; i < thread_data_.size(); ++i) thread_data_[i].thread.reset();
105  }
106 
107  void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
108  eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
109 
110  // Pass this information to each thread queue.
111  for (int i = 0; i < num_threads_; i++) {
112  const auto& pair = partitions[i];
113  unsigned start = pair.first, end = pair.second;
115  unsigned val = EncodePartition(start, end);
117  }
118  }
119 
120  void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { ScheduleWithHint(std::move(fn), 0, num_threads_); }
121 
122  void ScheduleWithHint(std::function<void()> fn, int start, int limit) override {
123  Task t = env_.CreateTask(std::move(fn));
124  PerThread* pt = GetPerThread();
125  if (pt->pool == this) {
126  // Worker thread of this pool, push onto the thread's queue.
127  Queue& q = thread_data_[pt->thread_id].queue;
128  t = q.PushFront(std::move(t));
129  } else {
130  // A free-standing thread (or worker of another pool), push onto a random
131  // queue.
132  eigen_plain_assert(start < limit);
134  int num_queues = limit - start;
135  int rnd = Rand(&pt->rand) % num_queues;
136  eigen_plain_assert(start + rnd < limit);
137  Queue& q = thread_data_[start + rnd].queue;
138  t = q.PushBack(std::move(t));
139  }
140  // Note: below we touch this after making w available to worker threads.
141  // Strictly speaking, this can lead to a racy-use-after-free. Consider that
142  // Schedule is called from a thread that is neither main thread nor a worker
143  // thread of this pool. Then, execution of w directly or indirectly
144  // completes overall computations, which in turn leads to destruction of
145  // this. We expect that such scenario is prevented by program, that is,
146  // this is kept alive while any threads can potentially be in Schedule.
147  if (!t.f) {
149  ec_.Notify(false);
150  }
151  } else {
152  env_.ExecuteTask(t); // Push failed, execute directly.
153  }
154  }
155 
156  // Tries to assign work to the current task.
157  void MaybeGetTask(Task* t) {
158  PerThread* pt = GetPerThread();
159  Queue& q = thread_data_[pt->thread_id].queue;
160  *t = q.PopFront();
161  if (t->f) return;
162  if (num_threads_ == 1) {
163  // For num_threads_ == 1 there is no point in going through the expensive
164  // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
165  // victim queues it might reverse the order in which ops are executed
166  // compared to the order in which they are scheduled, which tends to be
167  // counter-productive for the types of I/O workloads single thread pools
168  // tend to be used for.
169  for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront();
170  } else {
171  if (EIGEN_PREDICT_FALSE(!t->f)) *t = LocalSteal();
172  if (EIGEN_PREDICT_FALSE(!t->f)) *t = GlobalSteal();
173  if (EIGEN_PREDICT_FALSE(!t->f)) {
174  if (allow_spinning_ && StartSpinning()) {
175  for (int i = 0; i < spin_count_ && !t->f; ++i) *t = GlobalSteal();
176  // Notify `spinning_state_` that we are no longer spinning.
177  bool has_no_notify_task = StopSpinning();
178  // If a task was submitted to the queue without a call to
179  // `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned
180  // false), and we didn't steal anything above, we must try to
181  // steal one more time, to make sure that this task will be
182  // executed. We will not necessarily find it, because it might
183  // have been already stolen by some other thread.
184  if (has_no_notify_task && !t->f) *t = q.PopFront();
185  }
186  }
187  }
188  }
189 
191  cancelled_ = true;
192  done_ = true;
193 
194  // Let each thread know it's been cancelled.
195 #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
196  for (size_t i = 0; i < thread_data_.size(); i++) {
197  thread_data_[i].thread->OnCancel();
198  }
199 #endif
200 
201  // Wake up the threads without work to let them exit on their own.
202  ec_.Notify(true);
203  }
204 
205  int NumThreads() const EIGEN_FINAL { return num_threads_; }
206 
208  const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
209  if (pt->pool == this) {
210  return pt->thread_id;
211  } else {
212  return -1;
213  }
214  }
215 
216  private:
217  // Create a single atomic<int> that encodes start and limit information for
218  // each thread.
219  // We expect num_threads_ < 65536, so we can store them in a single
220  // std::atomic<unsigned>.
221  // Exposed publicly as static functions so that external callers can reuse
222  // this encode/decode logic for maintaining their own thread-safe copies of
223  // scheduling and steal domain(s).
224  static constexpr int kMaxPartitionBits = 16;
225  static constexpr int kMaxThreads = 1 << kMaxPartitionBits;
226 
227  inline unsigned EncodePartition(unsigned start, unsigned limit) { return (start << kMaxPartitionBits) | limit; }
228 
229  inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
230  *limit = val & (kMaxThreads - 1);
232  *start = val;
233  }
234 
235  void AssertBounds(int start, int end) {
237  eigen_plain_assert(start < end); // non-zero sized partition
239  }
240 
241  inline void SetStealPartition(size_t i, unsigned val) {
242  thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
243  }
244 
245  inline unsigned GetStealPartition(int i) { return thread_data_[i].steal_partition.load(std::memory_order_relaxed); }
246 
248  for (int i = 1; i <= N; i++) {
249  unsigned a = i;
250  unsigned b = N;
251  // If GCD(a, b) == 1, then a and b are coprimes.
252  while (b != 0) {
253  unsigned tmp = a;
254  a = b;
255  b = tmp % b;
256  }
257  if (a == 1) {
258  coprimes->push_back(i);
259  }
260  }
261  }
262 
263  // Maximum number of threads that can spin in steal loop.
264  static constexpr int kMaxSpinningThreads = 1;
265 
266  // The number of steal loop spin iterations before parking (this number is
267  // divided by the number of threads, to get spin count for each thread).
268  static constexpr int kSpinCount = 5000;
269 
270  // If there are enough active threads with empty pending-task queues, a thread
271  // that runs out of work can just be parked without spinning, because these
272  // active threads will go into a steal loop after finishing their current
273  // tasks.
274  //
275  // In the worst case when all active threads are executing long/expensive
276  // tasks, the next Schedule() will have to wait until one of the parked
277  // threads will be unparked, however this should be very rare in practice.
278  static constexpr int kMinActiveThreadsToStartSpinning = 4;
279 
280  struct SpinningState {
281  // Spinning state layout:
282  //
283  // - Low 32 bits encode the number of threads that are spinning in steal
284  // loop.
285  //
286  // - High 32 bits encode the number of tasks that were submitted to the pool
287  // without a call to `ec_.Notify()`. This number can't be larger than
288  // the number of spinning threads. Each spinning thread, when it exits the
289  // spin loop must check if this number is greater than zero, and maybe
290  // make another attempt to steal a task and decrement it by one.
291  static constexpr uint64_t kNumSpinningMask = 0x00000000FFFFFFFF;
292  static constexpr uint64_t kNumNoNotifyMask = 0xFFFFFFFF00000000;
293  static constexpr uint64_t kNumNoNotifyShift = 32;
294 
295  uint64_t num_spinning; // number of spinning threads
296  uint64_t num_no_notification; // number of tasks submitted without
297  // notifying waiting threads
298 
299  // Decodes `spinning_state_` value.
300  static SpinningState Decode(uint64_t state) {
303 
306  }
307 
308  // Encodes as `spinning_state_` value.
309  uint64_t Encode() const {
312  }
313  };
314 
315  Environment env_;
316  const int num_threads_;
317  const bool allow_spinning_;
318  const int spin_count_;
323  std::atomic<uint64_t> spinning_state_;
324  std::atomic<unsigned> blocked_;
325  std::atomic<bool> done_;
326  std::atomic<bool> cancelled_;
328 #ifndef EIGEN_THREAD_LOCAL
329  std::unique_ptr<Barrier> init_barrier_;
330  EIGEN_MUTEX per_thread_map_mutex_; // Protects per_thread_map_.
331  std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
332 #endif
333 
334  unsigned NumBlockedThreads() const { return blocked_.load(); }
335  unsigned NumActiveThreads() const { return num_threads_ - blocked_.load(); }
336 
337  // Main worker thread loop.
338  void WorkerLoop(int thread_id) {
339 #ifndef EIGEN_THREAD_LOCAL
340  std::unique_ptr<PerThread> new_pt(new PerThread());
341  per_thread_map_mutex_.lock();
342  bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
343  eigen_plain_assert(insertOK);
344  EIGEN_UNUSED_VARIABLE(insertOK);
345  per_thread_map_mutex_.unlock();
346  init_barrier_->Notify();
347  init_barrier_->Wait();
348 #endif
349  PerThread* pt = GetPerThread();
350  pt->pool = this;
351  pt->rand = GlobalThreadIdHash();
352  pt->thread_id = thread_id;
353  Task t;
354  while (!cancelled_.load(std::memory_order_relaxed)) {
355  MaybeGetTask(&t);
356  // If we still don't have a task, wait for one. Return if thread pool is
357  // in cancelled state.
358  if (EIGEN_PREDICT_FALSE(!t.f)) {
359  EventCount::Waiter* waiter = &waiters_[pt->thread_id];
360  if (!WaitForWork(waiter, &t)) return;
361  }
362  if (EIGEN_PREDICT_TRUE(t.f)) env_.ExecuteTask(t);
363  }
364  }
365 
366  // Steal tries to steal work from other worker threads in the range [start,
367  // limit) in best-effort manner.
368  Task Steal(unsigned start, unsigned limit) {
369  PerThread* pt = GetPerThread();
370  const size_t size = limit - start;
371  unsigned r = Rand(&pt->rand);
372  // Reduce r into [0, size) range, this utilizes trick from
373  // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
374  eigen_plain_assert(all_coprimes_[size - 1].size() < (1 << 30));
375  unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
376  unsigned index = ((uint64_t)all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
377  unsigned inc = all_coprimes_[size - 1][index];
378 
379  for (unsigned i = 0; i < size; i++) {
380  eigen_plain_assert(start + victim < limit);
381  Task t = thread_data_[start + victim].queue.PopBack();
382  if (t.f) {
383  return t;
384  }
385  victim += inc;
386  if (victim >= size) {
387  victim -= static_cast<unsigned int>(size);
388  }
389  }
390  return Task();
391  }
392 
393  // Steals work within threads belonging to the partition.
395  PerThread* pt = GetPerThread();
396  unsigned partition = GetStealPartition(pt->thread_id);
397  // If thread steal partition is the same as global partition, there is no
398  // need to go through the steal loop twice.
399  if (global_steal_partition_ == partition) return Task();
400  unsigned start, limit;
401  DecodePartition(partition, &start, &limit);
402  AssertBounds(start, limit);
403 
404  return Steal(start, limit);
405  }
406 
407  // Steals work from any other thread in the pool.
408  Task GlobalSteal() { return Steal(0, num_threads_); }
409 
410  // WaitForWork blocks until new work is available (returns true), or if it is
411  // time to exit (returns false). Can optionally return a task to execute in t
412  // (in such case t.f != nullptr on return).
414  eigen_plain_assert(!t->f);
415  // We already did best-effort emptiness check in Steal, so prepare for
416  // blocking.
417  ec_.Prewait();
418  // Now do a reliable emptiness check.
419  int victim = NonEmptyQueueIndex();
420  if (victim != -1) {
421  ec_.CancelWait();
422  if (cancelled_) {
423  return false;
424  } else {
425  *t = thread_data_[victim].queue.PopBack();
426  return true;
427  }
428  }
429  // Number of blocked threads is used as termination condition.
430  // If we are shutting down and all worker threads blocked without work,
431  // that's we are done.
432  blocked_++;
433  // TODO is blocked_ required to be unsigned?
434  if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
435  ec_.CancelWait();
436  // Almost done, but need to re-check queues.
437  // Consider that all queues are empty and all worker threads are preempted
438  // right after incrementing blocked_ above. Now a free-standing thread
439  // submits work and calls destructor (which sets done_). If we don't
440  // re-check queues, we will exit leaving the work unexecuted.
441  if (NonEmptyQueueIndex() != -1) {
442  // Note: we must not pop from queues before we decrement blocked_,
443  // otherwise the following scenario is possible. Consider that instead
444  // of checking for emptiness we popped the only element from queues.
445  // Now other worker threads can start exiting, which is bad if the
446  // work item submits other work. So we just check emptiness here,
447  // which ensures that all worker threads exit at the same time.
448  blocked_--;
449  return true;
450  }
451  // Reached stable termination state.
452  ec_.Notify(true);
453  return false;
454  }
455  ec_.CommitWait(waiter);
456  blocked_--;
457  return true;
458  }
459 
461  PerThread* pt = GetPerThread();
462  // We intentionally design NonEmptyQueueIndex to steal work from
463  // anywhere in the queue so threads don't block in WaitForWork() forever
464  // when all threads in their partition go to sleep. Steal is still local.
465  const size_t size = thread_data_.size();
466  unsigned r = Rand(&pt->rand);
467  unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
468  unsigned victim = r % size;
469  for (unsigned i = 0; i < size; i++) {
470  if (!thread_data_[victim].queue.Empty()) {
471  return victim;
472  }
473  victim += inc;
474  if (victim >= size) {
475  victim -= static_cast<unsigned int>(size);
476  }
477  }
478  return -1;
479  }
480 
481  // StartSpinning() checks if the number of threads in the spin loop is less
482  // than the allowed maximum. If so, increments the number of spinning threads
483  // by one and returns true (caller must enter the spin loop). Otherwise
484  // returns false, and the caller must not enter the spin loop.
485  bool StartSpinning() {
487 
488  uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
489  for (;;) {
490  SpinningState state = SpinningState::Decode(spinning);
491 
492  if ((state.num_spinning - state.num_no_notification) >= kMaxSpinningThreads) {
493  return false;
494  }
495 
496  // Increment the number of spinning threads.
497  ++state.num_spinning;
498 
499  if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
500  return true;
501  }
502  }
503  }
504 
505  // StopSpinning() decrements the number of spinning threads by one. It also
506  // checks if there were any tasks submitted into the pool without notifying
507  // parked threads, and decrements the count by one. Returns true if the number
508  // of tasks submitted without notification was decremented. In this case,
509  // caller thread might have to call Steal() one more time.
510  bool StopSpinning() {
511  uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
512  for (;;) {
513  SpinningState state = SpinningState::Decode(spinning);
514 
515  // Decrement the number of spinning threads.
516  --state.num_spinning;
517 
518  // Maybe decrement the number of tasks submitted without notification.
519  bool has_no_notify_task = state.num_no_notification > 0;
520  if (has_no_notify_task) --state.num_no_notification;
521 
522  if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
523  return has_no_notify_task;
524  }
525  }
526  }
527 
528  // IsNotifyParkedThreadRequired() returns true if parked thread must be
529  // notified about new added task. If there are threads spinning in the steal
530  // loop, there is no need to unpark any of the waiting threads, the task will
531  // be picked up by one of the spinning threads.
533  uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
534  for (;;) {
535  SpinningState state = SpinningState::Decode(spinning);
536 
537  // If the number of tasks submitted without notifying parked threads is
538  // equal to the number of spinning threads, we must wake up one of the
539  // parked threads.
540  if (state.num_no_notification == state.num_spinning) return true;
541 
542  // Increment the number of tasks submitted without notification.
543  ++state.num_no_notification;
544 
545  if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
546  return false;
547  }
548  }
549  }
550 
552  return std::hash<std::thread::id>()(std::this_thread::get_id());
553  }
554 
556 #ifndef EIGEN_THREAD_LOCAL
557  static PerThread dummy;
558  auto it = per_thread_map_.find(GlobalThreadIdHash());
559  if (it == per_thread_map_.end()) {
560  return &dummy;
561  } else {
562  return it->second.get();
563  }
564 #else
565  EIGEN_THREAD_LOCAL PerThread per_thread_;
566  PerThread* pt = &per_thread_;
567  return pt;
568 #endif
569  }
570 
571  static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
572  uint64_t current = *state;
573  // Update the internal state
574  *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
575  // Generate the random output (using the PCG-XSH-RS scheme)
576  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
577  }
578 };
579 
581 
582 } // namespace Eigen
583 
584 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define eigen_plain_assert(condition)
Definition: Assert.h:148
int i
Definition: BiCGSTAB_step_by_step.cpp:9
#define EIGEN_PREDICT_TRUE(x)
Definition: Macros.h:1180
#define EIGEN_OVERRIDE
Definition: Macros.h:1287
#define EIGEN_FINAL
Definition: Macros.h:1288
#define EIGEN_PREDICT_FALSE(x)
Definition: Macros.h:1179
#define EIGEN_UNUSED_VARIABLE(var)
Definition: Macros.h:966
#define EIGEN_STRONG_INLINE
Definition: Macros.h:834
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Scalar * b
Definition: benchVecAdd.cpp:17
Definition: Barrier.h:21
Definition: EventCount.h:184
Definition: EventCount.h:52
void CommitWait(Waiter *w)
Definition: EventCount.h:82
void Notify(bool notifyAll)
Definition: EventCount.h:127
void CancelWait()
Definition: EventCount.h:110
void Prewait()
Definition: EventCount.h:71
The MaxSizeVector class.
Definition: MaxSizeVector.h:31
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void push_back(const T &t)
Definition: MaxSizeVector.h:78
Definition: ThreadPoolInterface.h:20
Definition: NonBlockingThreadPool.h:19
void AssertBounds(int start, int end)
Definition: NonBlockingThreadPool.h:235
const int num_threads_
Definition: NonBlockingThreadPool.h:316
static constexpr int kMaxThreads
Definition: NonBlockingThreadPool.h:225
unsigned NumBlockedThreads() const
Definition: NonBlockingThreadPool.h:334
void Cancel() EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:190
Environment::Task Task
Definition: NonBlockingThreadPool.h:22
bool WaitForWork(EventCount::Waiter *waiter, Task *t)
Definition: NonBlockingThreadPool.h:413
std::atomic< unsigned > blocked_
Definition: NonBlockingThreadPool.h:324
int CurrentThreadId() const EIGEN_FINAL
Definition: NonBlockingThreadPool.h:207
const int spin_count_
Definition: NonBlockingThreadPool.h:318
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:120
EIGEN_MUTEX per_thread_map_mutex_
Definition: NonBlockingThreadPool.h:330
void ComputeCoprimes(int N, MaxSizeVector< unsigned > *coprimes)
Definition: NonBlockingThreadPool.h:247
~ThreadPoolTempl()
Definition: NonBlockingThreadPool.h:87
ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env=Environment())
Definition: NonBlockingThreadPool.h:41
static constexpr int kSpinCount
Definition: NonBlockingThreadPool.h:268
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t *state)
Definition: NonBlockingThreadPool.h:571
void ScheduleWithHint(std::function< void()> fn, int start, int limit) override
Definition: NonBlockingThreadPool.h:122
void SetStealPartition(size_t i, unsigned val)
Definition: NonBlockingThreadPool.h:241
int NonEmptyQueueIndex()
Definition: NonBlockingThreadPool.h:460
MaxSizeVector< ThreadData > thread_data_
Definition: NonBlockingThreadPool.h:319
std::unordered_map< uint64_t, std::unique_ptr< PerThread > > per_thread_map_
Definition: NonBlockingThreadPool.h:331
Environment::EnvThread Thread
Definition: NonBlockingThreadPool.h:21
const bool allow_spinning_
Definition: NonBlockingThreadPool.h:317
static constexpr int kMaxSpinningThreads
Definition: NonBlockingThreadPool.h:264
static constexpr int kMaxPartitionBits
Definition: NonBlockingThreadPool.h:224
unsigned NumActiveThreads() const
Definition: NonBlockingThreadPool.h:335
std::unique_ptr< Barrier > init_barrier_
Definition: NonBlockingThreadPool.h:329
Task LocalSteal()
Definition: NonBlockingThreadPool.h:394
std::atomic< uint64_t > spinning_state_
Definition: NonBlockingThreadPool.h:323
int NumThreads() const EIGEN_FINAL
Definition: NonBlockingThreadPool.h:205
void DecodePartition(unsigned val, unsigned *start, unsigned *limit)
Definition: NonBlockingThreadPool.h:229
static constexpr int kMinActiveThreadsToStartSpinning
Definition: NonBlockingThreadPool.h:278
EIGEN_STRONG_INLINE PerThread * GetPerThread()
Definition: NonBlockingThreadPool.h:555
unsigned GetStealPartition(int i)
Definition: NonBlockingThreadPool.h:245
ThreadPoolTempl(int num_threads, Environment env=Environment())
Definition: NonBlockingThreadPool.h:39
unsigned EncodePartition(unsigned start, unsigned limit)
Definition: NonBlockingThreadPool.h:227
MaxSizeVector< MaxSizeVector< unsigned > > all_coprimes_
Definition: NonBlockingThreadPool.h:320
std::atomic< bool > cancelled_
Definition: NonBlockingThreadPool.h:326
EventCount ec_
Definition: NonBlockingThreadPool.h:327
void SetStealPartitions(const std::vector< std::pair< unsigned, unsigned >> &partitions)
Definition: NonBlockingThreadPool.h:107
bool StartSpinning()
Definition: NonBlockingThreadPool.h:485
RunQueue< Task, 1024 > Queue
Definition: NonBlockingThreadPool.h:23
bool StopSpinning()
Definition: NonBlockingThreadPool.h:510
std::atomic< bool > done_
Definition: NonBlockingThreadPool.h:325
Task GlobalSteal()
Definition: NonBlockingThreadPool.h:408
unsigned global_steal_partition_
Definition: NonBlockingThreadPool.h:322
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash()
Definition: NonBlockingThreadPool.h:551
bool IsNotifyParkedThreadRequired()
Definition: NonBlockingThreadPool.h:532
MaxSizeVector< EventCount::Waiter > waiters_
Definition: NonBlockingThreadPool.h:321
void WorkerLoop(int thread_id)
Definition: NonBlockingThreadPool.h:338
Environment env_
Definition: NonBlockingThreadPool.h:315
void MaybeGetTask(Task *t)
Definition: NonBlockingThreadPool.h:157
Task Steal(unsigned start, unsigned limit)
Definition: NonBlockingThreadPool.h:368
@ N
Definition: constructor.cpp:22
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
const Scalar * a
Definition: level2_cplx_impl.h:32
Eigen::Matrix< Scalar, Dynamic, Dynamic, ColMajor > tmp
Definition: level3_impl.h:365
EIGEN_DEVICE_FUNC const Scalar & q
Definition: SpecialFunctionsImpl.h:2019
std::uint64_t uint64_t
Definition: Meta.h:42
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
ThreadPoolTempl< StlThreadEnvironment > ThreadPool
Definition: NonBlockingThreadPool.h:580
r
Definition: UniformPSDSelfTest.py:20
val
Definition: calibrate.py:119
void start(const unsigned &i)
(Re-)start i-th timer
Definition: oomph_utilities.cc:243
t
Definition: plotPSD.py:36
Definition: NonBlockingThreadPool.h:25
ThreadPoolTempl * pool
Definition: NonBlockingThreadPool.h:27
int thread_id
Definition: NonBlockingThreadPool.h:29
uint64_t rand
Definition: NonBlockingThreadPool.h:28
constexpr PerThread()
Definition: NonBlockingThreadPool.h:26
Definition: NonBlockingThreadPool.h:280
static constexpr uint64_t kNumSpinningMask
Definition: NonBlockingThreadPool.h:291
static constexpr uint64_t kNumNoNotifyMask
Definition: NonBlockingThreadPool.h:292
static constexpr uint64_t kNumNoNotifyShift
Definition: NonBlockingThreadPool.h:293
uint64_t num_spinning
Definition: NonBlockingThreadPool.h:295
uint64_t Encode() const
Definition: NonBlockingThreadPool.h:309
uint64_t num_no_notification
Definition: NonBlockingThreadPool.h:296
static SpinningState Decode(uint64_t state)
Definition: NonBlockingThreadPool.h:300
Definition: NonBlockingThreadPool.h:32
std::unique_ptr< Thread > thread
Definition: NonBlockingThreadPool.h:34
std::atomic< unsigned > steal_partition
Definition: NonBlockingThreadPool.h:35
constexpr ThreadData()
Definition: NonBlockingThreadPool.h:33
Queue queue
Definition: NonBlockingThreadPool.h:36