CoreThreadPoolDevice.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2023 Charlie Schlosser <cs.schlosser@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CORE_THREAD_POOL_DEVICE_H
11 #define EIGEN_CORE_THREAD_POOL_DEVICE_H
12 
13 namespace Eigen {
14 
15 // CoreThreadPoolDevice provides an easy-to-understand Device for parallelizing Eigen Core expressions with
16 // Threadpool. Expressions are recursively split evenly until the evaluation cost is less than the threshold for
17 // delegating the task to a thread.
18 /*
19  a
20  / \
21  / \
22  / \
23  / \
24  / \
25  / \
26  / \
27  a e
28  / \ / \
29  / \ / \
30  / \ / \
31  a c e g
32  / \ / \ / \ / \
33  / \ / \ / \ / \
34  a b c d e f g h
35 */
36 // Each task descends the binary tree to the left, delegates the right task to a new thread, and continues to the
37 // left. This ensures that work is evenly distributed to the thread pool as quickly as possible and minimizes the number
38 // of tasks created during the evaluation. Consider an expression that is divided into 8 chunks. The
39 // primary task 'a' creates tasks 'e' 'c' and 'b', and executes its portion of the expression at the bottom of the
40 // tree. Likewise, task 'e' creates tasks 'g' and 'f', and executes its portion of the expression.
41 
43  using Task = std::function<void()>;
44  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool& pool, float threadCostThreshold = 3e-5f)
45  : m_pool(pool) {
46  eigen_assert(threadCostThreshold >= 0.0f && "threadCostThreshold must be non-negative");
47  m_costFactor = threadCostThreshold;
48  }
49 
50  template <int PacketSize>
52  eigen_assert(cost >= 0.0f && "cost must be non-negative");
53  Index numOps = size / PacketSize;
54  int actualThreads = numOps < m_pool.NumThreads() ? static_cast<int>(numOps) : m_pool.NumThreads();
55  float totalCost = static_cast<float>(numOps) * cost;
56  float idealThreads = totalCost * m_costFactor;
57  if (idealThreads < static_cast<float>(actualThreads)) {
58  idealThreads = numext::maxi(idealThreads, 1.0f);
59  actualThreads = numext::mini(actualThreads, static_cast<int>(idealThreads));
60  }
61  int maxLevel = internal::log2_ceil(actualThreads);
62  return maxLevel;
63  }
64 
65 // MSVC does not like inlining parallelForImpl
66 #if EIGEN_COMP_MSVC && !EIGEN_COMP_CLANG
67 #define EIGEN_PARALLEL_FOR_INLINE
68 #else
69 #define EIGEN_PARALLEL_FOR_INLINE EIGEN_STRONG_INLINE
70 #endif
71 
72  template <typename UnaryFunctor, int PacketSize>
74  Barrier& barrier, int level) {
75  while (level > 0) {
76  level--;
77  Index size = end - begin;
78  eigen_assert(size % PacketSize == 0 && "this function assumes size is a multiple of PacketSize");
79  Index mid = begin + numext::round_down(size >> 1, PacketSize);
80  Task right = [this, mid, end, &f, &barrier, level]() {
81  parallelForImpl<UnaryFunctor, PacketSize>(mid, end, f, barrier, level);
82  };
83  m_pool.Schedule(std::move(right));
84  end = mid;
85  }
86  for (Index i = begin; i < end; i += PacketSize) f(i);
87  barrier.Notify();
88  }
89 
90  template <typename BinaryFunctor, int PacketSize>
92  Index innerEnd, BinaryFunctor& f, Barrier& barrier,
93  int level) {
94  while (level > 0) {
95  level--;
96  Index outerSize = outerEnd - outerBegin;
97  if (outerSize > 1) {
98  Index outerMid = outerBegin + (outerSize >> 1);
99  Task right = [this, &f, &barrier, outerMid, outerEnd, innerBegin, innerEnd, level]() {
100  parallelForImpl<BinaryFunctor, PacketSize>(outerMid, outerEnd, innerBegin, innerEnd, f, barrier, level);
101  };
102  m_pool.Schedule(std::move(right));
103  outerEnd = outerMid;
104  } else {
105  Index innerSize = innerEnd - innerBegin;
106  eigen_assert(innerSize % PacketSize == 0 && "this function assumes innerSize is a multiple of PacketSize");
107  Index innerMid = innerBegin + numext::round_down(innerSize >> 1, PacketSize);
108  Task right = [this, &f, &barrier, outerBegin, outerEnd, innerMid, innerEnd, level]() {
109  parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerMid, innerEnd, f, barrier, level);
110  };
111  m_pool.Schedule(std::move(right));
112  innerEnd = innerMid;
113  }
114  }
115  for (Index outer = outerBegin; outer < outerEnd; outer++)
116  for (Index inner = innerBegin; inner < innerEnd; inner += PacketSize) f(outer, inner);
117  barrier.Notify();
118  }
119 
120 #undef EIGEN_PARALLEL_FOR_INLINE
121 
122  template <typename UnaryFunctor, int PacketSize>
123  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor& f, float cost) {
124  Index size = end - begin;
125  int maxLevel = calculateLevels<PacketSize>(size, cost);
126  Barrier barrier(1 << maxLevel);
127  parallelForImpl<UnaryFunctor, PacketSize>(begin, end, f, barrier, maxLevel);
128  barrier.Wait();
129  }
130 
131  template <typename BinaryFunctor, int PacketSize>
132  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin,
133  Index innerEnd, BinaryFunctor& f, float cost) {
134  Index outerSize = outerEnd - outerBegin;
135  Index innerSize = innerEnd - innerBegin;
136  Index size = outerSize * innerSize;
137  int maxLevel = calculateLevels<PacketSize>(size, cost);
138  Barrier barrier(1 << maxLevel);
139  parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerBegin, innerEnd, f, barrier, maxLevel);
140  barrier.Wait();
141  }
142 
144  // costFactor is the cost of delegating a task to a thread
145  // the inverse is used to avoid a floating point division
147 };
148 
149 // specialization of coefficient-wise assignment loops for CoreThreadPoolDevice
150 
151 namespace internal {
152 
153 template <typename Kernel>
154 struct cost_helper {
155  using SrcEvaluatorType = typename Kernel::SrcEvaluatorType;
156  using DstEvaluatorType = typename Kernel::DstEvaluatorType;
160 };
161 
162 template <typename Kernel>
164  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
165  struct AssignmentFunctor : public Kernel {
168  this->assignCoeffByOuterInner(outer, inner);
169  }
170  };
171 
173  const Index innerSize = kernel.innerSize();
174  const Index outerSize = kernel.outerSize();
175  constexpr float cost = static_cast<float>(XprEvaluationCost);
176  AssignmentFunctor functor(kernel);
177  device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, 0, innerSize, functor, cost);
178  }
179 };
180 
181 template <typename Kernel>
184  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, InnerSize = DstXprType::InnerSizeAtCompileTime;
185  struct AssignmentFunctor : public Kernel {
189  }
190  };
192  const Index outerSize = kernel.outerSize();
193  AssignmentFunctor functor(kernel);
194  constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
195  device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, functor, cost);
196  }
197 };
198 
199 template <typename Kernel>
201  using PacketType = typename Kernel::PacketType;
202  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
203  SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
204  DstAlignment = Kernel::AssignmentTraits::DstAlignment;
205  struct AssignmentFunctor : public Kernel {
208  this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
209  }
210  };
212  const Index innerSize = kernel.innerSize();
213  const Index outerSize = kernel.outerSize();
214  const float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize);
215  AssignmentFunctor functor(kernel);
216  device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, 0, innerSize, functor, cost);
217  }
218 };
219 
220 template <typename Kernel>
222  using PacketType = typename Kernel::PacketType;
224  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
225  SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
226  DstAlignment = Kernel::AssignmentTraits::DstAlignment,
227  InnerSize = DstXprType::InnerSizeAtCompileTime;
228  struct AssignmentFunctor : public Kernel {
232  }
233  };
235  const Index outerSize = kernel.outerSize();
236  constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
237  AssignmentFunctor functor(kernel);
238  device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, functor, cost);
239  }
240 };
241 
242 template <typename Kernel>
244  using Scalar = typename Kernel::Scalar;
245  using PacketType = typename Kernel::PacketType;
246  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size;
247  struct PacketAssignmentFunctor : public Kernel {
250  this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
251  }
252  };
253  struct ScalarAssignmentFunctor : public Kernel {
256  const Index innerSize = this->innerSize();
257  const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
258  for (Index inner = packetAccessSize; inner < innerSize; inner++) this->assignCoeffByOuterInner(outer, inner);
259  }
260  };
262  const Index outerSize = kernel.outerSize();
263  const Index innerSize = kernel.innerSize();
264  const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
265  constexpr float packetCost = static_cast<float>(XprEvaluationCost);
266  const float scalarCost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize - packetAccessSize);
267  PacketAssignmentFunctor packetFunctor(kernel);
268  ScalarAssignmentFunctor scalarFunctor(kernel);
269  device.template parallelFor<PacketAssignmentFunctor, PacketSize>(0, outerSize, 0, packetAccessSize, packetFunctor,
270  packetCost);
271  device.template parallelFor<ScalarAssignmentFunctor, 1>(0, outerSize, scalarFunctor, scalarCost);
272  };
273 };
274 
275 template <typename Kernel>
277  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
278  struct AssignmentFunctor : public Kernel {
280  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->assignCoeff(index); }
281  };
283  const Index size = kernel.size();
284  constexpr float cost = static_cast<float>(XprEvaluationCost);
285  AssignmentFunctor functor(kernel);
286  device.template parallelFor<AssignmentFunctor, 1>(0, size, functor, cost);
287  }
288 };
289 
290 template <typename Kernel>
292  using Scalar = typename Kernel::Scalar;
293  using PacketType = typename Kernel::PacketType;
294  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost,
295  RequestedAlignment = Kernel::AssignmentTraits::LinearRequiredAlignment,
297  DstIsAligned = Kernel::AssignmentTraits::DstAlignment >= RequestedAlignment,
298  DstAlignment = packet_traits<Scalar>::AlignedOnScalar ? RequestedAlignment
299  : Kernel::AssignmentTraits::DstAlignment,
300  SrcAlignment = Kernel::AssignmentTraits::JointAlignment;
301  struct AssignmentFunctor : public Kernel {
304  this->template assignPacket<DstAlignment, SrcAlignment, PacketType>(index);
305  }
306  };
308  const Index size = kernel.size();
309  const Index alignedStart =
310  DstIsAligned ? 0 : internal::first_aligned<RequestedAlignment>(kernel.dstDataPtr(), size);
311  const Index alignedEnd = alignedStart + numext::round_down(size - alignedStart, PacketSize);
312 
314 
315  constexpr float cost = static_cast<float>(XprEvaluationCost);
316  AssignmentFunctor functor(kernel);
317  device.template parallelFor<AssignmentFunctor, PacketSize>(alignedStart, alignedEnd, functor, cost);
318 
319  unaligned_dense_assignment_loop<>::run(kernel, alignedEnd, size);
320  }
321 };
322 
323 } // namespace internal
324 
325 } // namespace Eigen
326 
327 #endif // EIGEN_CORE_THREAD_POOL_DEVICE_H
int i
Definition: BiCGSTAB_step_by_step.cpp:9
#define EIGEN_PARALLEL_FOR_INLINE
Definition: CoreThreadPoolDevice.h:69
#define EIGEN_DEVICE_FUNC
Definition: Macros.h:892
#define eigen_assert(x)
Definition: Macros.h:910
#define EIGEN_STRONG_INLINE
Definition: Macros.h:834
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
SCALAR Scalar
Definition: bench_gemm.cpp:45
Definition: Barrier.h:21
void Wait()
Definition: Barrier.h:43
void Notify()
Definition: Barrier.h:28
Definition: NonBlockingThreadPool.h:19
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:120
int NumThreads() const EIGEN_FINAL
Definition: NonBlockingThreadPool.h:205
static int f(const TensorMap< Tensor< int, 3 > > &tensor)
Definition: cxx11_tensor_map.cpp:237
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
@ InnerVectorizedTraversal
Definition: Constants.h:284
@ DefaultTraversal
Definition: Constants.h:279
@ InnerUnrolling
Definition: Constants.h:303
@ NoUnrolling
Definition: Constants.h:301
int log2_ceil(const BitsType &x)
Definition: MathFunctions.h:758
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T maxi(const T &x, const T &y)
Definition: MathFunctions.h:926
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE EIGEN_CONSTEXPR T round_down(T a, U b)
Definition: MathFunctions.h:1266
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
Definition: MathFunctions.h:920
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:83
Definition: Eigen_Colamd.h:49
CwiseBinaryOp< internal::scalar_sum_op< double, double >, const CpyMatrixXd, const CpyMatrixXd > XprType
Definition: nestbyvalue.cpp:15
Definition: CoreThreadPoolDevice.h:42
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool &pool, float threadCostThreshold=3e-5f)
Definition: CoreThreadPoolDevice.h:44
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin, Index innerEnd, BinaryFunctor &f, float cost)
Definition: CoreThreadPoolDevice.h:132
EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index begin, Index end, UnaryFunctor &f, Barrier &barrier, int level)
Definition: CoreThreadPoolDevice.h:73
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor &f, float cost)
Definition: CoreThreadPoolDevice.h:123
ThreadPool & m_pool
Definition: CoreThreadPoolDevice.h:143
std::function< void()> Task
Definition: CoreThreadPoolDevice.h:43
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int calculateLevels(Index size, float cost) const
Definition: CoreThreadPoolDevice.h:51
EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index outerBegin, Index outerEnd, Index innerBegin, Index innerEnd, BinaryFunctor &f, Barrier &barrier, int level)
Definition: CoreThreadPoolDevice.h:91
float m_costFactor
Definition: CoreThreadPoolDevice.h:146
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, Index outer)
Definition: AssignEvaluator.h:220
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, Index outer)
Definition: AssignEvaluator.h:281
Definition: CoreThreadPoolDevice.h:154
typename Kernel::SrcEvaluatorType SrcEvaluatorType
Definition: CoreThreadPoolDevice.h:155
typename SrcEvaluatorType::XprType SrcXprType
Definition: CoreThreadPoolDevice.h:157
typename Kernel::DstEvaluatorType DstEvaluatorType
Definition: CoreThreadPoolDevice.h:156
static constexpr Index Cost
Definition: CoreThreadPoolDevice.h:159
typename DstEvaluatorType::XprType DstXprType
Definition: CoreThreadPoolDevice.h:158
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE ScalarAssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:254
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:172
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:307
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:234
typename Kernel::DstEvaluatorType::XprType DstXprType
Definition: CoreThreadPoolDevice.h:223
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:282
typename Kernel::DstEvaluatorType::XprType DstXprType
Definition: CoreThreadPoolDevice.h:183
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:191
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner)
Definition: CoreThreadPoolDevice.h:249
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketAssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:248
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:229
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer)
Definition: CoreThreadPoolDevice.h:230
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:279
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index)
Definition: CoreThreadPoolDevice.h:280
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index)
Definition: CoreThreadPoolDevice.h:303
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:302
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:186
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer)
Definition: CoreThreadPoolDevice.h:187
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner)
Definition: CoreThreadPoolDevice.h:167
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:166
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:211
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner)
Definition: CoreThreadPoolDevice.h:207
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel &kernel)
Definition: CoreThreadPoolDevice.h:206
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel &kernel, CoreThreadPoolDevice &device)
Definition: CoreThreadPoolDevice.h:261
Definition: XprHelper.h:263
Definition: GenericPacketMath.h:108
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE EIGEN_CONSTEXPR void run(Kernel &, Index, Index)
Definition: AssignEvaluator.h:363
Definition: GenericPacketMath.h:134