Eigen::ThreadPoolTempl< Environment > Class Template Reference

#include <NonBlockingThreadPool.h>

+ Inheritance diagram for Eigen::ThreadPoolTempl< Environment >:

Classes

struct  PerThread
 
struct  SpinningState
 
struct  ThreadData
 

Public Types

typedef Environment::EnvThread Thread
 
typedef Environment::Task Task
 
typedef RunQueue< Task, 1024 > Queue
 

Public Member Functions

 ThreadPoolTempl (int num_threads, Environment env=Environment())
 
 ThreadPoolTempl (int num_threads, bool allow_spinning, Environment env=Environment())
 
 ~ThreadPoolTempl ()
 
void SetStealPartitions (const std::vector< std::pair< unsigned, unsigned >> &partitions)
 
void Schedule (std::function< void()> fn) EIGEN_OVERRIDE
 
void ScheduleWithHint (std::function< void()> fn, int start, int limit) override
 
void MaybeGetTask (Task *t)
 
void Cancel () EIGEN_OVERRIDE
 
int NumThreads () const EIGEN_FINAL
 
int CurrentThreadId () const EIGEN_FINAL
 
- Public Member Functions inherited from Eigen::ThreadPoolInterface
virtual ~ThreadPoolInterface ()
 

Private Member Functions

unsigned EncodePartition (unsigned start, unsigned limit)
 
void DecodePartition (unsigned val, unsigned *start, unsigned *limit)
 
void AssertBounds (int start, int end)
 
void SetStealPartition (size_t i, unsigned val)
 
unsigned GetStealPartition (int i)
 
void ComputeCoprimes (int N, MaxSizeVector< unsigned > *coprimes)
 
unsigned NumBlockedThreads () const
 
unsigned NumActiveThreads () const
 
void WorkerLoop (int thread_id)
 
Task Steal (unsigned start, unsigned limit)
 
Task LocalSteal ()
 
Task GlobalSteal ()
 
bool WaitForWork (EventCount::Waiter *waiter, Task *t)
 
int NonEmptyQueueIndex ()
 
bool StartSpinning ()
 
bool StopSpinning ()
 
bool IsNotifyParkedThreadRequired ()
 
EIGEN_STRONG_INLINE PerThreadGetPerThread ()
 

Static Private Member Functions

static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash ()
 
static EIGEN_STRONG_INLINE unsigned Rand (uint64_t *state)
 

Private Attributes

Environment env_
 
const int num_threads_
 
const bool allow_spinning_
 
const int spin_count_
 
MaxSizeVector< ThreadDatathread_data_
 
MaxSizeVector< MaxSizeVector< unsigned > > all_coprimes_
 
MaxSizeVector< EventCount::Waiterwaiters_
 
unsigned global_steal_partition_
 
std::atomic< uint64_t > spinning_state_
 
std::atomic< unsignedblocked_
 
std::atomic< booldone_
 
std::atomic< boolcancelled_
 
EventCount ec_
 
std::unique_ptr< Barrierinit_barrier_
 
EIGEN_MUTEX per_thread_map_mutex_
 
std::unordered_map< uint64_t, std::unique_ptr< PerThread > > per_thread_map_
 

Static Private Attributes

static constexpr int kMaxPartitionBits = 16
 
static constexpr int kMaxThreads = 1 << kMaxPartitionBits
 
static constexpr int kMaxSpinningThreads = 1
 
static constexpr int kSpinCount = 5000
 
static constexpr int kMinActiveThreadsToStartSpinning = 4
 

Member Typedef Documentation

◆ Queue

template<typename Environment >
typedef RunQueue<Task, 1024> Eigen::ThreadPoolTempl< Environment >::Queue

◆ Task

template<typename Environment >
typedef Environment::Task Eigen::ThreadPoolTempl< Environment >::Task

◆ Thread

template<typename Environment >
typedef Environment::EnvThread Eigen::ThreadPoolTempl< Environment >::Thread

Constructor & Destructor Documentation

◆ ThreadPoolTempl() [1/2]

template<typename Environment >
Eigen::ThreadPoolTempl< Environment >::ThreadPoolTempl ( int  num_threads,
Environment  env = Environment() 
)
inline
39 : ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, Environment env=Environment())
Definition: NonBlockingThreadPool.h:39

◆ ThreadPoolTempl() [2/2]

template<typename Environment >
Eigen::ThreadPoolTempl< Environment >::ThreadPoolTempl ( int  num_threads,
bool  allow_spinning,
Environment  env = Environment() 
)
inline
42  : env_(env),
43  num_threads_(num_threads),
44  allow_spinning_(allow_spinning),
46  // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is proportional to num_threads_ and
47  // we assume that new work is scheduled at a constant rate, so we divide `kSpintCount` by number of
48  // threads and number of spinning threads. The constant was picked based on a fair dice roll, tune it.
49  allow_spinning && num_threads > 0 ? kSpinCount / kMaxSpinningThreads / num_threads : 0),
50  thread_data_(num_threads),
51  all_coprimes_(num_threads),
52  waiters_(num_threads),
54  spinning_state_(0),
55  blocked_(0),
56  done_(false),
57  cancelled_(false),
58  ec_(waiters_) {
59  waiters_.resize(num_threads_);
60  // Calculate coprimes of all numbers [1, num_threads].
61  // Coprimes are used for random walks over all threads in Steal
62  // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
63  // a random starting thread index t and calculate num_threads - 1 subsequent
64  // indices as (t + coprime) % num_threads, we will cover all threads without
65  // repetitions (effectively getting a presudo-random permutation of thread
66  // indices).
68  for (int i = 1; i <= num_threads_; ++i) {
69  all_coprimes_.emplace_back(i);
71  }
72 #ifndef EIGEN_THREAD_LOCAL
73  init_barrier_.reset(new Barrier(num_threads_));
74 #endif
75  thread_data_.resize(num_threads_);
76  for (int i = 0; i < num_threads_; i++) {
78  thread_data_[i].thread.reset(env_.CreateThread([this, i]() { WorkerLoop(i); }));
79  }
80 #ifndef EIGEN_THREAD_LOCAL
81  // Wait for workers to initialize per_thread_map_. Otherwise we might race
82  // with them in Schedule or CurrentThreadId.
83  init_barrier_->Wait();
84 #endif
85  }
#define eigen_plain_assert(condition)
Definition: Assert.h:148
int i
Definition: BiCGSTAB_step_by_step.cpp:9
const int num_threads_
Definition: NonBlockingThreadPool.h:316
static constexpr int kMaxThreads
Definition: NonBlockingThreadPool.h:225
std::atomic< unsigned > blocked_
Definition: NonBlockingThreadPool.h:324
const int spin_count_
Definition: NonBlockingThreadPool.h:318
void ComputeCoprimes(int N, MaxSizeVector< unsigned > *coprimes)
Definition: NonBlockingThreadPool.h:247
static constexpr int kSpinCount
Definition: NonBlockingThreadPool.h:268
void SetStealPartition(size_t i, unsigned val)
Definition: NonBlockingThreadPool.h:241
MaxSizeVector< ThreadData > thread_data_
Definition: NonBlockingThreadPool.h:319
const bool allow_spinning_
Definition: NonBlockingThreadPool.h:317
static constexpr int kMaxSpinningThreads
Definition: NonBlockingThreadPool.h:264
std::unique_ptr< Barrier > init_barrier_
Definition: NonBlockingThreadPool.h:329
std::atomic< uint64_t > spinning_state_
Definition: NonBlockingThreadPool.h:323
unsigned EncodePartition(unsigned start, unsigned limit)
Definition: NonBlockingThreadPool.h:227
MaxSizeVector< MaxSizeVector< unsigned > > all_coprimes_
Definition: NonBlockingThreadPool.h:320
std::atomic< bool > cancelled_
Definition: NonBlockingThreadPool.h:326
EventCount ec_
Definition: NonBlockingThreadPool.h:327
std::atomic< bool > done_
Definition: NonBlockingThreadPool.h:325
unsigned global_steal_partition_
Definition: NonBlockingThreadPool.h:322
MaxSizeVector< EventCount::Waiter > waiters_
Definition: NonBlockingThreadPool.h:321
Environment env_
Definition: NonBlockingThreadPool.h:315

References Eigen::ThreadPoolTempl< Environment >::all_coprimes_, Eigen::ThreadPoolTempl< Environment >::ComputeCoprimes(), eigen_plain_assert, Eigen::ThreadPoolTempl< Environment >::EncodePartition(), Eigen::ThreadPoolTempl< Environment >::env_, i, Eigen::ThreadPoolTempl< Environment >::init_barrier_, Eigen::ThreadPoolTempl< Environment >::kMaxThreads, Eigen::ThreadPoolTempl< Environment >::num_threads_, Eigen::ThreadPoolTempl< Environment >::SetStealPartition(), Eigen::ThreadPoolTempl< Environment >::thread_data_, and Eigen::ThreadPoolTempl< Environment >::waiters_.

◆ ~ThreadPoolTempl()

template<typename Environment >
Eigen::ThreadPoolTempl< Environment >::~ThreadPoolTempl ( )
inline
87  {
88  done_ = true;
89 
90  // Now if all threads block without work, they will start exiting.
91  // But note that threads can continue to work arbitrary long,
92  // block, submit new work, unblock and otherwise live full life.
93  if (!cancelled_) {
94  ec_.Notify(true);
95  } else {
96  // Since we were cancelled, there might be entries in the queues.
97  // Empty them to prevent their destructor from asserting.
98  for (size_t i = 0; i < thread_data_.size(); i++) {
99  thread_data_[i].queue.Flush();
100  }
101  }
102  // Join threads explicitly (by destroying) to avoid destruction order within
103  // this class.
104  for (size_t i = 0; i < thread_data_.size(); ++i) thread_data_[i].thread.reset();
105  }
void Notify(bool notifyAll)
Definition: EventCount.h:127

References Eigen::ThreadPoolTempl< Environment >::cancelled_, Eigen::ThreadPoolTempl< Environment >::done_, Eigen::ThreadPoolTempl< Environment >::ec_, i, Eigen::EventCount::Notify(), and Eigen::ThreadPoolTempl< Environment >::thread_data_.

Member Function Documentation

◆ AssertBounds()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::AssertBounds ( int  start,
int  end 
)
inlineprivate
235  {
237  eigen_plain_assert(start < end); // non-zero sized partition
239  }
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
void start(const unsigned &i)
(Re-)start i-th timer
Definition: oomph_utilities.cc:243

References eigen_plain_assert, Eigen::placeholders::end, Eigen::ThreadPoolTempl< Environment >::num_threads_, and oomph::CumulativeTimings::start().

Referenced by Eigen::ThreadPoolTempl< Environment >::LocalSteal(), and Eigen::ThreadPoolTempl< Environment >::SetStealPartitions().

◆ Cancel()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::Cancel ( )
inlinevirtual

Reimplemented from Eigen::ThreadPoolInterface.

190  {
191  cancelled_ = true;
192  done_ = true;
193 
194  // Let each thread know it's been cancelled.
195 #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
196  for (size_t i = 0; i < thread_data_.size(); i++) {
197  thread_data_[i].thread->OnCancel();
198  }
199 #endif
200 
201  // Wake up the threads without work to let them exit on their own.
202  ec_.Notify(true);
203  }

References Eigen::ThreadPoolTempl< Environment >::cancelled_, Eigen::ThreadPoolTempl< Environment >::done_, Eigen::ThreadPoolTempl< Environment >::ec_, i, Eigen::EventCount::Notify(), and Eigen::ThreadPoolTempl< Environment >::thread_data_.

Referenced by test_cancel().

◆ ComputeCoprimes()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::ComputeCoprimes ( int  N,
MaxSizeVector< unsigned > *  coprimes 
)
inlineprivate
247  {
248  for (int i = 1; i <= N; i++) {
249  unsigned a = i;
250  unsigned b = N;
251  // If GCD(a, b) == 1, then a and b are coprimes.
252  while (b != 0) {
253  unsigned tmp = a;
254  a = b;
255  b = tmp % b;
256  }
257  if (a == 1) {
258  coprimes->push_back(i);
259  }
260  }
261  }
Scalar * b
Definition: benchVecAdd.cpp:17
@ N
Definition: constructor.cpp:22
const Scalar * a
Definition: level2_cplx_impl.h:32
Eigen::Matrix< Scalar, Dynamic, Dynamic, ColMajor > tmp
Definition: level3_impl.h:365

References a, b, i, N, Eigen::MaxSizeVector< T >::push_back(), and tmp.

Referenced by Eigen::ThreadPoolTempl< Environment >::ThreadPoolTempl().

◆ CurrentThreadId()

template<typename Environment >
int Eigen::ThreadPoolTempl< Environment >::CurrentThreadId ( ) const
inlinevirtual

Implements Eigen::ThreadPoolInterface.

207  {
208  const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
209  if (pt->pool == this) {
210  return pt->thread_id;
211  } else {
212  return -1;
213  }
214  }
EIGEN_STRONG_INLINE PerThread * GetPerThread()
Definition: NonBlockingThreadPool.h:555
int thread_id
Definition: NonBlockingThreadPool.h:29

References Eigen::ThreadPoolTempl< Environment >::GetPerThread(), Eigen::ThreadPoolTempl< Environment >::PerThread::pool, and Eigen::ThreadPoolTempl< Environment >::PerThread::thread_id.

Referenced by test_parallelism(), and test_pool_partitions().

◆ DecodePartition()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::DecodePartition ( unsigned  val,
unsigned start,
unsigned limit 
)
inlineprivate
229  {
230  *limit = val & (kMaxThreads - 1);
232  *start = val;
233  }
static constexpr int kMaxPartitionBits
Definition: NonBlockingThreadPool.h:224
val
Definition: calibrate.py:119

References Eigen::ThreadPoolTempl< Environment >::kMaxPartitionBits, Eigen::ThreadPoolTempl< Environment >::kMaxThreads, oomph::CumulativeTimings::start(), and calibrate::val.

Referenced by Eigen::ThreadPoolTempl< Environment >::LocalSteal().

◆ EncodePartition()

◆ GetPerThread()

template<typename Environment >
EIGEN_STRONG_INLINE PerThread* Eigen::ThreadPoolTempl< Environment >::GetPerThread ( )
inlineprivate
555  {
556 #ifndef EIGEN_THREAD_LOCAL
557  static PerThread dummy;
558  auto it = per_thread_map_.find(GlobalThreadIdHash());
559  if (it == per_thread_map_.end()) {
560  return &dummy;
561  } else {
562  return it->second.get();
563  }
564 #else
565  EIGEN_THREAD_LOCAL PerThread per_thread_;
566  PerThread* pt = &per_thread_;
567  return pt;
568 #endif
569  }
std::unordered_map< uint64_t, std::unique_ptr< PerThread > > per_thread_map_
Definition: NonBlockingThreadPool.h:331
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash()
Definition: NonBlockingThreadPool.h:551

References Eigen::ThreadPoolTempl< Environment >::GlobalThreadIdHash(), and Eigen::ThreadPoolTempl< Environment >::per_thread_map_.

Referenced by Eigen::ThreadPoolTempl< Environment >::CurrentThreadId(), Eigen::ThreadPoolTempl< Environment >::LocalSteal(), Eigen::ThreadPoolTempl< Environment >::MaybeGetTask(), Eigen::ThreadPoolTempl< Environment >::NonEmptyQueueIndex(), Eigen::ThreadPoolTempl< Environment >::ScheduleWithHint(), Eigen::ThreadPoolTempl< Environment >::Steal(), and Eigen::ThreadPoolTempl< Environment >::WorkerLoop().

◆ GetStealPartition()

template<typename Environment >
unsigned Eigen::ThreadPoolTempl< Environment >::GetStealPartition ( int  i)
inlineprivate
245 { return thread_data_[i].steal_partition.load(std::memory_order_relaxed); }

References i, and Eigen::ThreadPoolTempl< Environment >::thread_data_.

Referenced by Eigen::ThreadPoolTempl< Environment >::LocalSteal().

◆ GlobalSteal()

template<typename Environment >
Task Eigen::ThreadPoolTempl< Environment >::GlobalSteal ( )
inlineprivate
408 { return Steal(0, num_threads_); }
Task Steal(unsigned start, unsigned limit)
Definition: NonBlockingThreadPool.h:368

References Eigen::ThreadPoolTempl< Environment >::num_threads_, and Eigen::ThreadPoolTempl< Environment >::Steal().

Referenced by Eigen::ThreadPoolTempl< Environment >::MaybeGetTask().

◆ GlobalThreadIdHash()

template<typename Environment >
static EIGEN_STRONG_INLINE uint64_t Eigen::ThreadPoolTempl< Environment >::GlobalThreadIdHash ( )
inlinestaticprivate
551  {
552  return std::hash<std::thread::id>()(std::this_thread::get_id());
553  }

Referenced by Eigen::ThreadPoolTempl< Environment >::GetPerThread(), and Eigen::ThreadPoolTempl< Environment >::WorkerLoop().

◆ IsNotifyParkedThreadRequired()

template<typename Environment >
bool Eigen::ThreadPoolTempl< Environment >::IsNotifyParkedThreadRequired ( )
inlineprivate
532  {
533  uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
534  for (;;) {
535  SpinningState state = SpinningState::Decode(spinning);
536 
537  // If the number of tasks submitted without notifying parked threads is
538  // equal to the number of spinning threads, we must wake up one of the
539  // parked threads.
540  if (state.num_no_notification == state.num_spinning) return true;
541 
542  // Increment the number of tasks submitted without notification.
543  ++state.num_no_notification;
544 
545  if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
546  return false;
547  }
548  }
549  }
std::uint64_t uint64_t
Definition: Meta.h:42
static SpinningState Decode(uint64_t state)
Definition: NonBlockingThreadPool.h:300

References Eigen::ThreadPoolTempl< Environment >::SpinningState::Decode(), Eigen::ThreadPoolTempl< Environment >::SpinningState::Encode(), Eigen::ThreadPoolTempl< Environment >::SpinningState::num_no_notification, Eigen::ThreadPoolTempl< Environment >::SpinningState::num_spinning, and Eigen::ThreadPoolTempl< Environment >::spinning_state_.

Referenced by Eigen::ThreadPoolTempl< Environment >::ScheduleWithHint().

◆ LocalSteal()

template<typename Environment >
Task Eigen::ThreadPoolTempl< Environment >::LocalSteal ( )
inlineprivate
394  {
395  PerThread* pt = GetPerThread();
396  unsigned partition = GetStealPartition(pt->thread_id);
397  // If thread steal partition is the same as global partition, there is no
398  // need to go through the steal loop twice.
399  if (global_steal_partition_ == partition) return Task();
400  unsigned start, limit;
401  DecodePartition(partition, &start, &limit);
402  AssertBounds(start, limit);
403 
404  return Steal(start, limit);
405  }
void AssertBounds(int start, int end)
Definition: NonBlockingThreadPool.h:235
Environment::Task Task
Definition: NonBlockingThreadPool.h:22
void DecodePartition(unsigned val, unsigned *start, unsigned *limit)
Definition: NonBlockingThreadPool.h:229
unsigned GetStealPartition(int i)
Definition: NonBlockingThreadPool.h:245

References Eigen::ThreadPoolTempl< Environment >::AssertBounds(), Eigen::ThreadPoolTempl< Environment >::DecodePartition(), Eigen::ThreadPoolTempl< Environment >::GetPerThread(), Eigen::ThreadPoolTempl< Environment >::GetStealPartition(), Eigen::ThreadPoolTempl< Environment >::global_steal_partition_, oomph::CumulativeTimings::start(), Eigen::ThreadPoolTempl< Environment >::Steal(), and Eigen::ThreadPoolTempl< Environment >::PerThread::thread_id.

Referenced by Eigen::ThreadPoolTempl< Environment >::MaybeGetTask().

◆ MaybeGetTask()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::MaybeGetTask ( Task t)
inline
157  {
158  PerThread* pt = GetPerThread();
159  Queue& q = thread_data_[pt->thread_id].queue;
160  *t = q.PopFront();
161  if (t->f) return;
162  if (num_threads_ == 1) {
163  // For num_threads_ == 1 there is no point in going through the expensive
164  // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
165  // victim queues it might reverse the order in which ops are executed
166  // compared to the order in which they are scheduled, which tends to be
167  // counter-productive for the types of I/O workloads single thread pools
168  // tend to be used for.
169  for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront();
170  } else {
171  if (EIGEN_PREDICT_FALSE(!t->f)) *t = LocalSteal();
172  if (EIGEN_PREDICT_FALSE(!t->f)) *t = GlobalSteal();
173  if (EIGEN_PREDICT_FALSE(!t->f)) {
174  if (allow_spinning_ && StartSpinning()) {
175  for (int i = 0; i < spin_count_ && !t->f; ++i) *t = GlobalSteal();
176  // Notify `spinning_state_` that we are no longer spinning.
177  bool has_no_notify_task = StopSpinning();
178  // If a task was submitted to the queue without a call to
179  // `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned
180  // false), and we didn't steal anything above, we must try to
181  // steal one more time, to make sure that this task will be
182  // executed. We will not necessarily find it, because it might
183  // have been already stolen by some other thread.
184  if (has_no_notify_task && !t->f) *t = q.PopFront();
185  }
186  }
187  }
188  }
#define EIGEN_PREDICT_FALSE(x)
Definition: Macros.h:1179
Task LocalSteal()
Definition: NonBlockingThreadPool.h:394
bool StartSpinning()
Definition: NonBlockingThreadPool.h:485
RunQueue< Task, 1024 > Queue
Definition: NonBlockingThreadPool.h:23
bool StopSpinning()
Definition: NonBlockingThreadPool.h:510
Task GlobalSteal()
Definition: NonBlockingThreadPool.h:408
EIGEN_DEVICE_FUNC const Scalar & q
Definition: SpecialFunctionsImpl.h:2019
t
Definition: plotPSD.py:36

References Eigen::ThreadPoolTempl< Environment >::allow_spinning_, EIGEN_PREDICT_FALSE, Eigen::ThreadPoolTempl< Environment >::GetPerThread(), Eigen::ThreadPoolTempl< Environment >::GlobalSteal(), i, Eigen::ThreadPoolTempl< Environment >::LocalSteal(), Eigen::ThreadPoolTempl< Environment >::num_threads_, Eigen::numext::q, Eigen::ThreadPoolTempl< Environment >::spin_count_, Eigen::ThreadPoolTempl< Environment >::StartSpinning(), Eigen::ThreadPoolTempl< Environment >::StopSpinning(), plotPSD::t, Eigen::ThreadPoolTempl< Environment >::thread_data_, and Eigen::ThreadPoolTempl< Environment >::PerThread::thread_id.

Referenced by Eigen::ForkJoinScheduler::ForkJoin(), and Eigen::ThreadPoolTempl< Environment >::WorkerLoop().

◆ NonEmptyQueueIndex()

template<typename Environment >
int Eigen::ThreadPoolTempl< Environment >::NonEmptyQueueIndex ( )
inlineprivate
460  {
461  PerThread* pt = GetPerThread();
462  // We intentionally design NonEmptyQueueIndex to steal work from
463  // anywhere in the queue so threads don't block in WaitForWork() forever
464  // when all threads in their partition go to sleep. Steal is still local.
465  const size_t size = thread_data_.size();
466  unsigned r = Rand(&pt->rand);
467  unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
468  unsigned victim = r % size;
469  for (unsigned i = 0; i < size; i++) {
470  if (!thread_data_[victim].queue.Empty()) {
471  return victim;
472  }
473  victim += inc;
474  if (victim >= size) {
475  victim -= static_cast<unsigned int>(size);
476  }
477  }
478  return -1;
479  }
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t *state)
Definition: NonBlockingThreadPool.h:571
r
Definition: UniformPSDSelfTest.py:20

References Eigen::ThreadPoolTempl< Environment >::all_coprimes_, Eigen::ThreadPoolTempl< Environment >::GetPerThread(), i, UniformPSDSelfTest::r, Eigen::ThreadPoolTempl< Environment >::PerThread::rand, Eigen::ThreadPoolTempl< Environment >::Rand(), size, and Eigen::ThreadPoolTempl< Environment >::thread_data_.

Referenced by Eigen::ThreadPoolTempl< Environment >::WaitForWork().

◆ NumActiveThreads()

template<typename Environment >
unsigned Eigen::ThreadPoolTempl< Environment >::NumActiveThreads ( ) const
inlineprivate

◆ NumBlockedThreads()

template<typename Environment >
unsigned Eigen::ThreadPoolTempl< Environment >::NumBlockedThreads ( ) const
inlineprivate
334 { return blocked_.load(); }

References Eigen::ThreadPoolTempl< Environment >::blocked_.

◆ NumThreads()

template<typename Environment >
int Eigen::ThreadPoolTempl< Environment >::NumThreads ( ) const
inlinevirtual

◆ Rand()

template<typename Environment >
static EIGEN_STRONG_INLINE unsigned Eigen::ThreadPoolTempl< Environment >::Rand ( uint64_t *  state)
inlinestaticprivate
571  {
572  uint64_t current = *state;
573  // Update the internal state
574  *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
575  // Generate the random output (using the PCG-XSH-RS scheme)
576  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
577  }

Referenced by Eigen::ThreadPoolTempl< Environment >::NonEmptyQueueIndex(), Eigen::ThreadPoolTempl< Environment >::ScheduleWithHint(), and Eigen::ThreadPoolTempl< Environment >::Steal().

◆ Schedule()

◆ ScheduleWithHint()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::ScheduleWithHint ( std::function< void()>  fn,
int  start,
int  limit 
)
inlineoverridevirtual

Reimplemented from Eigen::ThreadPoolInterface.

122  {
123  Task t = env_.CreateTask(std::move(fn));
124  PerThread* pt = GetPerThread();
125  if (pt->pool == this) {
126  // Worker thread of this pool, push onto the thread's queue.
127  Queue& q = thread_data_[pt->thread_id].queue;
128  t = q.PushFront(std::move(t));
129  } else {
130  // A free-standing thread (or worker of another pool), push onto a random
131  // queue.
132  eigen_plain_assert(start < limit);
134  int num_queues = limit - start;
135  int rnd = Rand(&pt->rand) % num_queues;
136  eigen_plain_assert(start + rnd < limit);
137  Queue& q = thread_data_[start + rnd].queue;
138  t = q.PushBack(std::move(t));
139  }
140  // Note: below we touch this after making w available to worker threads.
141  // Strictly speaking, this can lead to a racy-use-after-free. Consider that
142  // Schedule is called from a thread that is neither main thread nor a worker
143  // thread of this pool. Then, execution of w directly or indirectly
144  // completes overall computations, which in turn leads to destruction of
145  // this. We expect that such scenario is prevented by program, that is,
146  // this is kept alive while any threads can potentially be in Schedule.
147  if (!t.f) {
149  ec_.Notify(false);
150  }
151  } else {
152  env_.ExecuteTask(t); // Push failed, execute directly.
153  }
154  }
bool IsNotifyParkedThreadRequired()
Definition: NonBlockingThreadPool.h:532

References Eigen::ThreadPoolTempl< Environment >::ec_, eigen_plain_assert, Eigen::ThreadPoolTempl< Environment >::env_, Eigen::ThreadPoolTempl< Environment >::GetPerThread(), Eigen::ThreadPoolTempl< Environment >::IsNotifyParkedThreadRequired(), Eigen::EventCount::Notify(), Eigen::ThreadPoolTempl< Environment >::num_threads_, Eigen::ThreadPoolTempl< Environment >::PerThread::pool, Eigen::numext::q, Eigen::ThreadPoolTempl< Environment >::PerThread::rand, Eigen::ThreadPoolTempl< Environment >::Rand(), oomph::CumulativeTimings::start(), plotPSD::t, Eigen::ThreadPoolTempl< Environment >::thread_data_, and Eigen::ThreadPoolTempl< Environment >::PerThread::thread_id.

Referenced by Eigen::ThreadPoolTempl< Environment >::Schedule(), and test_pool_partitions().

◆ SetStealPartition()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::SetStealPartition ( size_t  i,
unsigned  val 
)
inlineprivate

◆ SetStealPartitions()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::SetStealPartitions ( const std::vector< std::pair< unsigned, unsigned >> &  partitions)
inline
107  {
108  eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
109 
110  // Pass this information to each thread queue.
111  for (int i = 0; i < num_threads_; i++) {
112  const auto& pair = partitions[i];
113  unsigned start = pair.first, end = pair.second;
115  unsigned val = EncodePartition(start, end);
117  }
118  }

References Eigen::ThreadPoolTempl< Environment >::AssertBounds(), eigen_plain_assert, Eigen::ThreadPoolTempl< Environment >::EncodePartition(), Eigen::placeholders::end, i, Eigen::ThreadPoolTempl< Environment >::num_threads_, Eigen::ThreadPoolTempl< Environment >::SetStealPartition(), oomph::CumulativeTimings::start(), and calibrate::val.

Referenced by test_pool_partitions().

◆ StartSpinning()

template<typename Environment >
bool Eigen::ThreadPoolTempl< Environment >::StartSpinning ( )
inlineprivate
485  {
487 
488  uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
489  for (;;) {
490  SpinningState state = SpinningState::Decode(spinning);
491 
492  if ((state.num_spinning - state.num_no_notification) >= kMaxSpinningThreads) {
493  return false;
494  }
495 
496  // Increment the number of spinning threads.
497  ++state.num_spinning;
498 
499  if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
500  return true;
501  }
502  }
503  }
unsigned NumActiveThreads() const
Definition: NonBlockingThreadPool.h:335
static constexpr int kMinActiveThreadsToStartSpinning
Definition: NonBlockingThreadPool.h:278

References Eigen::ThreadPoolTempl< Environment >::SpinningState::Decode(), Eigen::ThreadPoolTempl< Environment >::SpinningState::Encode(), Eigen::ThreadPoolTempl< Environment >::kMaxSpinningThreads, Eigen::ThreadPoolTempl< Environment >::kMinActiveThreadsToStartSpinning, Eigen::ThreadPoolTempl< Environment >::SpinningState::num_no_notification, Eigen::ThreadPoolTempl< Environment >::SpinningState::num_spinning, Eigen::ThreadPoolTempl< Environment >::NumActiveThreads(), and Eigen::ThreadPoolTempl< Environment >::spinning_state_.

Referenced by Eigen::ThreadPoolTempl< Environment >::MaybeGetTask().

◆ Steal()

template<typename Environment >
Task Eigen::ThreadPoolTempl< Environment >::Steal ( unsigned  start,
unsigned  limit 
)
inlineprivate
368  {
369  PerThread* pt = GetPerThread();
370  const size_t size = limit - start;
371  unsigned r = Rand(&pt->rand);
372  // Reduce r into [0, size) range, this utilizes trick from
373  // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
374  eigen_plain_assert(all_coprimes_[size - 1].size() < (1 << 30));
375  unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
376  unsigned index = ((uint64_t)all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
377  unsigned inc = all_coprimes_[size - 1][index];
378 
379  for (unsigned i = 0; i < size; i++) {
380  eigen_plain_assert(start + victim < limit);
381  Task t = thread_data_[start + victim].queue.PopBack();
382  if (t.f) {
383  return t;
384  }
385  victim += inc;
386  if (victim >= size) {
387  victim -= static_cast<unsigned int>(size);
388  }
389  }
390  return Task();
391  }

References Eigen::ThreadPoolTempl< Environment >::all_coprimes_, eigen_plain_assert, Eigen::ThreadPoolTempl< Environment >::GetPerThread(), i, UniformPSDSelfTest::r, Eigen::ThreadPoolTempl< Environment >::PerThread::rand, Eigen::ThreadPoolTempl< Environment >::Rand(), size, oomph::CumulativeTimings::start(), plotPSD::t, and Eigen::ThreadPoolTempl< Environment >::thread_data_.

Referenced by Eigen::ThreadPoolTempl< Environment >::GlobalSteal(), and Eigen::ThreadPoolTempl< Environment >::LocalSteal().

◆ StopSpinning()

template<typename Environment >
bool Eigen::ThreadPoolTempl< Environment >::StopSpinning ( )
inlineprivate
510  {
511  uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
512  for (;;) {
513  SpinningState state = SpinningState::Decode(spinning);
514 
515  // Decrement the number of spinning threads.
516  --state.num_spinning;
517 
518  // Maybe decrement the number of tasks submitted without notification.
519  bool has_no_notify_task = state.num_no_notification > 0;
520  if (has_no_notify_task) --state.num_no_notification;
521 
522  if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
523  return has_no_notify_task;
524  }
525  }
526  }

References Eigen::ThreadPoolTempl< Environment >::SpinningState::Decode(), Eigen::ThreadPoolTempl< Environment >::SpinningState::Encode(), Eigen::ThreadPoolTempl< Environment >::SpinningState::num_no_notification, Eigen::ThreadPoolTempl< Environment >::SpinningState::num_spinning, and Eigen::ThreadPoolTempl< Environment >::spinning_state_.

Referenced by Eigen::ThreadPoolTempl< Environment >::MaybeGetTask().

◆ WaitForWork()

template<typename Environment >
bool Eigen::ThreadPoolTempl< Environment >::WaitForWork ( EventCount::Waiter waiter,
Task t 
)
inlineprivate
413  {
414  eigen_plain_assert(!t->f);
415  // We already did best-effort emptiness check in Steal, so prepare for
416  // blocking.
417  ec_.Prewait();
418  // Now do a reliable emptiness check.
419  int victim = NonEmptyQueueIndex();
420  if (victim != -1) {
421  ec_.CancelWait();
422  if (cancelled_) {
423  return false;
424  } else {
425  *t = thread_data_[victim].queue.PopBack();
426  return true;
427  }
428  }
429  // Number of blocked threads is used as termination condition.
430  // If we are shutting down and all worker threads blocked without work,
431  // that's we are done.
432  blocked_++;
433  // TODO is blocked_ required to be unsigned?
434  if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
435  ec_.CancelWait();
436  // Almost done, but need to re-check queues.
437  // Consider that all queues are empty and all worker threads are preempted
438  // right after incrementing blocked_ above. Now a free-standing thread
439  // submits work and calls destructor (which sets done_). If we don't
440  // re-check queues, we will exit leaving the work unexecuted.
441  if (NonEmptyQueueIndex() != -1) {
442  // Note: we must not pop from queues before we decrement blocked_,
443  // otherwise the following scenario is possible. Consider that instead
444  // of checking for emptiness we popped the only element from queues.
445  // Now other worker threads can start exiting, which is bad if the
446  // work item submits other work. So we just check emptiness here,
447  // which ensures that all worker threads exit at the same time.
448  blocked_--;
449  return true;
450  }
451  // Reached stable termination state.
452  ec_.Notify(true);
453  return false;
454  }
455  ec_.CommitWait(waiter);
456  blocked_--;
457  return true;
458  }
void CommitWait(Waiter *w)
Definition: EventCount.h:82
void CancelWait()
Definition: EventCount.h:110
void Prewait()
Definition: EventCount.h:71
int NonEmptyQueueIndex()
Definition: NonBlockingThreadPool.h:460

References Eigen::ThreadPoolTempl< Environment >::blocked_, Eigen::ThreadPoolTempl< Environment >::cancelled_, Eigen::EventCount::CancelWait(), Eigen::EventCount::CommitWait(), Eigen::ThreadPoolTempl< Environment >::done_, Eigen::ThreadPoolTempl< Environment >::ec_, eigen_plain_assert, Eigen::ThreadPoolTempl< Environment >::NonEmptyQueueIndex(), Eigen::EventCount::Notify(), Eigen::ThreadPoolTempl< Environment >::num_threads_, Eigen::EventCount::Prewait(), plotPSD::t, and Eigen::ThreadPoolTempl< Environment >::thread_data_.

Referenced by Eigen::ThreadPoolTempl< Environment >::WorkerLoop().

◆ WorkerLoop()

template<typename Environment >
void Eigen::ThreadPoolTempl< Environment >::WorkerLoop ( int  thread_id)
inlineprivate
338  {
339 #ifndef EIGEN_THREAD_LOCAL
340  std::unique_ptr<PerThread> new_pt(new PerThread());
341  per_thread_map_mutex_.lock();
342  bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
343  eigen_plain_assert(insertOK);
344  EIGEN_UNUSED_VARIABLE(insertOK);
345  per_thread_map_mutex_.unlock();
346  init_barrier_->Notify();
347  init_barrier_->Wait();
348 #endif
349  PerThread* pt = GetPerThread();
350  pt->pool = this;
351  pt->rand = GlobalThreadIdHash();
352  pt->thread_id = thread_id;
353  Task t;
354  while (!cancelled_.load(std::memory_order_relaxed)) {
355  MaybeGetTask(&t);
356  // If we still don't have a task, wait for one. Return if thread pool is
357  // in cancelled state.
358  if (EIGEN_PREDICT_FALSE(!t.f)) {
359  EventCount::Waiter* waiter = &waiters_[pt->thread_id];
360  if (!WaitForWork(waiter, &t)) return;
361  }
362  if (EIGEN_PREDICT_TRUE(t.f)) env_.ExecuteTask(t);
363  }
364  }
#define EIGEN_PREDICT_TRUE(x)
Definition: Macros.h:1180
#define EIGEN_UNUSED_VARIABLE(var)
Definition: Macros.h:966
bool WaitForWork(EventCount::Waiter *waiter, Task *t)
Definition: NonBlockingThreadPool.h:413
EIGEN_MUTEX per_thread_map_mutex_
Definition: NonBlockingThreadPool.h:330
void MaybeGetTask(Task *t)
Definition: NonBlockingThreadPool.h:157

References Eigen::ThreadPoolTempl< Environment >::cancelled_, eigen_plain_assert, EIGEN_PREDICT_FALSE, EIGEN_PREDICT_TRUE, EIGEN_UNUSED_VARIABLE, Eigen::ThreadPoolTempl< Environment >::env_, Eigen::ThreadPoolTempl< Environment >::GetPerThread(), Eigen::ThreadPoolTempl< Environment >::GlobalThreadIdHash(), Eigen::ThreadPoolTempl< Environment >::init_barrier_, Eigen::ThreadPoolTempl< Environment >::MaybeGetTask(), Eigen::ThreadPoolTempl< Environment >::per_thread_map_, Eigen::ThreadPoolTempl< Environment >::per_thread_map_mutex_, Eigen::ThreadPoolTempl< Environment >::PerThread::pool, Eigen::ThreadPoolTempl< Environment >::PerThread::rand, plotPSD::t, Eigen::ThreadPoolTempl< Environment >::PerThread::thread_id, Eigen::ThreadPoolTempl< Environment >::waiters_, and Eigen::ThreadPoolTempl< Environment >::WaitForWork().

Member Data Documentation

◆ all_coprimes_

◆ allow_spinning_

template<typename Environment >
const bool Eigen::ThreadPoolTempl< Environment >::allow_spinning_
private

◆ blocked_

◆ cancelled_

◆ done_

◆ ec_

◆ env_

◆ global_steal_partition_

template<typename Environment >
unsigned Eigen::ThreadPoolTempl< Environment >::global_steal_partition_
private

◆ init_barrier_

template<typename Environment >
std::unique_ptr<Barrier> Eigen::ThreadPoolTempl< Environment >::init_barrier_
private

◆ kMaxPartitionBits

template<typename Environment >
constexpr int Eigen::ThreadPoolTempl< Environment >::kMaxPartitionBits = 16
staticconstexprprivate

◆ kMaxSpinningThreads

template<typename Environment >
constexpr int Eigen::ThreadPoolTempl< Environment >::kMaxSpinningThreads = 1
staticconstexprprivate

◆ kMaxThreads

template<typename Environment >
constexpr int Eigen::ThreadPoolTempl< Environment >::kMaxThreads = 1 << kMaxPartitionBits
staticconstexprprivate

◆ kMinActiveThreadsToStartSpinning

template<typename Environment >
constexpr int Eigen::ThreadPoolTempl< Environment >::kMinActiveThreadsToStartSpinning = 4
staticconstexprprivate

◆ kSpinCount

template<typename Environment >
constexpr int Eigen::ThreadPoolTempl< Environment >::kSpinCount = 5000
staticconstexprprivate

◆ num_threads_

◆ per_thread_map_

template<typename Environment >
std::unordered_map<uint64_t, std::unique_ptr<PerThread> > Eigen::ThreadPoolTempl< Environment >::per_thread_map_
private

◆ per_thread_map_mutex_

template<typename Environment >
EIGEN_MUTEX Eigen::ThreadPoolTempl< Environment >::per_thread_map_mutex_
private

◆ spin_count_

template<typename Environment >
const int Eigen::ThreadPoolTempl< Environment >::spin_count_
private

◆ spinning_state_

◆ thread_data_

◆ waiters_


The documentation for this class was generated from the following file: