Parallelizer.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2010 Gael Guennebaud <gael.guennebaud@inria.fr>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_PARALLELIZER_H
11 #define EIGEN_PARALLELIZER_H
12 
13 // IWYU pragma: private
14 #include "../InternalHeaderCheck.h"
15 
16 // Note that in the following, there are 3 different uses of the concept
17 // "number of threads":
18 // 1. Max number of threads used by OpenMP or ThreadPool.
19 // * For OpenMP this is typically the value set by the OMP_NUM_THREADS
20 // environment variable, or by a call to omp_set_num_threads() prior to
21 // calling Eigen.
22 // * For ThreadPool, this is the number of threads in the ThreadPool.
23 // 2. Max number of threads currently allowed to be used by parallel Eigen
24 // operations. This is set by setNbThreads(), and cannot exceed the value
25 // in 1.
26 // 3. The actual number of threads used for a given parallel Eigen operation.
27 // This is typically computed on the fly using a cost model and cannot exceed
28 // the value in 2.
29 // * For OpenMP, this is typically the number of threads specified in individual
30 // "omp parallel" pragmas associated with an Eigen operation.
31 // * For ThreadPool, it is the number of concurrent tasks scheduled in the
32 // threadpool for a given Eigen operation. Notice that since the threadpool
33 // uses task stealing, there is no way to limit the number of concurrently
34 // executing tasks to below the number in 1. except by limiting the total
35 // number of tasks in flight.
36 
37 #if defined(EIGEN_HAS_OPENMP) && defined(EIGEN_GEMM_THREADPOOL)
38 #error "EIGEN_HAS_OPENMP and EIGEN_GEMM_THREADPOOL may not both be defined."
39 #endif
40 
41 namespace Eigen {
42 
43 namespace internal {
44 inline void manage_multi_threading(Action action, int* v);
45 }
46 
47 // Public APIs.
48 
51 
54 inline int nbThreads() {
55  int ret;
57  return ret;
58 }
59 
63 
64 #ifdef EIGEN_GEMM_THREADPOOL
65 // Sets the ThreadPool used by Eigen parallel Gemm.
66 //
67 // NOTICE: This function has a known race condition with
68 // parallelize_gemm below, and should not be called while
69 // an instance of that function is running.
70 //
71 // TODO(rmlarsen): Make the device API available instead of
72 // storing a local static pointer variable to avoid this issue.
73 inline ThreadPool* setGemmThreadPool(ThreadPool* new_pool) {
74  static ThreadPool* pool = nullptr;
75  if (new_pool != nullptr) {
76  // This will wait for work in all threads in *pool to finish,
77  // then destroy the old ThreadPool, and then replace it with new_pool.
78  pool = new_pool;
79  // Reset the number of threads to the number of threads on the new pool.
80  setNbThreads(pool->NumThreads());
81  }
82  return pool;
83 }
84 
85 // Gets the ThreadPool used by Eigen parallel Gemm.
86 inline ThreadPool* getGemmThreadPool() { return setGemmThreadPool(nullptr); }
87 #endif
88 
89 namespace internal {
90 
91 // Implementation.
92 
93 #if defined(EIGEN_USE_BLAS) || (!defined(EIGEN_HAS_OPENMP) && !defined(EIGEN_GEMM_THREADPOOL))
94 
95 inline void manage_multi_threading(Action action, int* v) {
96  if (action == SetAction) {
97  eigen_internal_assert(v != nullptr);
98  } else if (action == GetAction) {
99  eigen_internal_assert(v != nullptr);
100  *v = 1;
101  } else {
102  eigen_internal_assert(false);
103  }
104 }
105 template <typename Index>
107 template <bool Condition, typename Functor, typename Index>
109  bool /*unused*/) {
110  func(0, rows, 0, cols);
111 }
112 
113 #else
114 
115 template <typename Index>
116 struct GemmParallelTaskInfo {
117  GemmParallelTaskInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {}
118  std::atomic<Index> sync;
119  std::atomic<int> users;
120  Index lhs_start;
121  Index lhs_length;
122 };
123 
124 template <typename Index>
125 struct GemmParallelInfo {
126  const int logical_thread_id;
127  const int num_threads;
128  GemmParallelTaskInfo<Index>* task_info;
129 
130  GemmParallelInfo(int logical_thread_id_, int num_threads_, GemmParallelTaskInfo<Index>* task_info_)
131  : logical_thread_id(logical_thread_id_), num_threads(num_threads_), task_info(task_info_) {}
132 };
133 
134 inline void manage_multi_threading(Action action, int* v) {
135  static int m_maxThreads = -1;
136  if (action == SetAction) {
137  eigen_internal_assert(v != nullptr);
138 #if defined(EIGEN_HAS_OPENMP)
139  // Calling action == SetAction and *v = 0 means
140  // restoring m_maxThreads to the maximum number of threads specified
141  // for OpenMP.
142  eigen_internal_assert(*v >= 0);
143  int omp_threads = omp_get_max_threads();
144  m_maxThreads = (*v == 0 ? omp_threads : std::min(*v, omp_threads));
145 #elif defined(EIGEN_GEMM_THREADPOOL)
146  // Calling action == SetAction and *v = 0 means
147  // restoring m_maxThreads to the number of threads in the ThreadPool,
148  // which defaults to 1 if no pool was provided.
149  eigen_internal_assert(*v >= 0);
150  ThreadPool* pool = getGemmThreadPool();
151  int pool_threads = pool != nullptr ? pool->NumThreads() : 1;
152  m_maxThreads = (*v == 0 ? pool_threads : numext::mini(pool_threads, *v));
153 #endif
154  } else if (action == GetAction) {
155  eigen_internal_assert(v != nullptr);
156 #if defined(EIGEN_HAS_OPENMP)
157  if (m_maxThreads > 0)
158  *v = m_maxThreads;
159  else
160  *v = omp_get_max_threads();
161 #else
162  *v = m_maxThreads;
163 #endif
164  } else {
165  eigen_internal_assert(false);
166  }
167 }
168 
169 template <bool Condition, typename Functor, typename Index>
171  // Dynamically check whether we should even try to execute in parallel.
172  // The conditions are:
173  // - the max number of threads we can create is greater than 1
174  // - we are not already in a parallel code
175  // - the sizes are large enough
176 
177  // compute the maximal number of threads from the size of the product:
178  // This first heuristic takes into account that the product kernel is fully optimized when working with nr columns at
179  // once.
180  Index size = transpose ? rows : cols;
181  Index pb_max_threads = std::max<Index>(1, size / Functor::Traits::nr);
182 
183  // compute the maximal number of threads from the total amount of work:
184  double work = static_cast<double>(rows) * static_cast<double>(cols) * static_cast<double>(depth);
185  double kMinTaskSize = 50000; // FIXME improve this heuristic.
186  pb_max_threads = std::max<Index>(1, std::min<Index>(pb_max_threads, static_cast<Index>(work / kMinTaskSize)));
187 
188  // compute the number of threads we are going to use
189  int threads = std::min<int>(nbThreads(), static_cast<int>(pb_max_threads));
190 
191  // if multi-threading is explicitly disabled, not useful, or if we already are
192  // inside a parallel session, then abort multi-threading
193  bool dont_parallelize = (!Condition) || (threads <= 1);
194 #if defined(EIGEN_HAS_OPENMP)
195  // don't parallelize if we are executing in a parallel context already.
196  dont_parallelize |= omp_get_num_threads() > 1;
197 #elif defined(EIGEN_GEMM_THREADPOOL)
198  // don't parallelize if we have a trivial threadpool or the current thread id
199  // is != -1, indicating that we are already executing on a thread inside the pool.
200  // In other words, we do not allow nested parallelism, since this would lead to
201  // deadlocks due to the workstealing nature of the threadpool.
202  ThreadPool* pool = getGemmThreadPool();
203  dont_parallelize |= (pool == nullptr || pool->CurrentThreadId() != -1);
204 #endif
205  if (dont_parallelize) return func(0, rows, 0, cols);
206 
207  func.initParallelSession(threads);
208 
209  if (transpose) std::swap(rows, cols);
210 
211  ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo<Index>, task_info, threads, 0);
212 
213 #if defined(EIGEN_HAS_OPENMP)
214 #pragma omp parallel num_threads(threads)
215  {
216  Index i = omp_get_thread_num();
217  // Note that the actual number of threads might be lower than the number of
218  // requested ones
219  Index actual_threads = omp_get_num_threads();
220  GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
221 
222  Index blockCols = (cols / actual_threads) & ~Index(0x3);
223  Index blockRows = (rows / actual_threads);
224  blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
225 
226  Index r0 = i * blockRows;
227  Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
228 
229  Index c0 = i * blockCols;
230  Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
231 
232  info.task_info[i].lhs_start = r0;
233  info.task_info[i].lhs_length = actualBlockRows;
234 
235  if (transpose)
236  func(c0, actualBlockCols, 0, rows, &info);
237  else
238  func(0, rows, c0, actualBlockCols, &info);
239  }
240 
241 #elif defined(EIGEN_GEMM_THREADPOOL)
242  Barrier barrier(threads);
243  auto task = [=, &func, &barrier, &task_info](int i) {
244  Index actual_threads = threads;
245  GemmParallelInfo<Index> info(i, static_cast<int>(actual_threads), task_info);
246  Index blockCols = (cols / actual_threads) & ~Index(0x3);
247  Index blockRows = (rows / actual_threads);
248  blockRows = (blockRows / Functor::Traits::mr) * Functor::Traits::mr;
249 
250  Index r0 = i * blockRows;
251  Index actualBlockRows = (i + 1 == actual_threads) ? rows - r0 : blockRows;
252 
253  Index c0 = i * blockCols;
254  Index actualBlockCols = (i + 1 == actual_threads) ? cols - c0 : blockCols;
255 
256  info.task_info[i].lhs_start = r0;
257  info.task_info[i].lhs_length = actualBlockRows;
258 
259  if (transpose)
260  func(c0, actualBlockCols, 0, rows, &info);
261  else
262  func(0, rows, c0, actualBlockCols, &info);
263 
264  barrier.Notify();
265  };
266  // Notice that we do not schedule more than "threads" tasks, which allows us to
267  // limit number of running threads, even if the threadpool itself was constructed
268  // with a larger number of threads.
269  for (int i = 0; i < threads - 1; ++i) {
270  pool->Schedule([=, task = std::move(task)] { task(i); });
271  }
272  task(threads - 1);
273  barrier.Wait();
274 #endif
275 }
276 
277 #endif
278 
279 } // end namespace internal
280 } // end namespace Eigen
281 
282 #endif // EIGEN_PARALLELIZER_H
Array< int, Dynamic, 1 > v
Definition: Array_initializer_list_vector_cxx11.cpp:1
int i
Definition: BiCGSTAB_step_by_step.cpp:9
#define EIGEN_DEPRECATED
Definition: Macros.h:931
#define eigen_internal_assert(x)
Definition: Macros.h:916
#define EIGEN_STRONG_INLINE
Definition: Macros.h:834
#define ei_declare_aligned_stack_constructed_variable(TYPE, NAME, SIZE, BUFFER)
Definition: Memory.h:806
int rows
Definition: Tutorial_commainit_02.cpp:1
int cols
Definition: Tutorial_commainit_02.cpp:1
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
#define min(a, b)
Definition: datatypes.h:22
Action
Definition: Constants.h:516
@ GetAction
Definition: Constants.h:516
@ SetAction
Definition: Constants.h:516
Eigen::DenseIndex ret
Definition: level1_cplx_impl.h:43
EIGEN_BLAS_FUNC() swap(int *n, RealScalar *px, int *incx, RealScalar *py, int *incy)
Definition: level1_impl.h:117
int info
Definition: level2_cplx_impl.h:39
func(actual_m, actual_n, a, *lda, actual_b, 1, actual_c, 1, alpha)
EIGEN_STRONG_INLINE void parallelize_gemm(const Functor &func, Index rows, Index cols, Index, bool)
Definition: Parallelizer.h:108
void manage_multi_threading(Action action, int *v)
Definition: Parallelizer.h:95
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
Definition: MathFunctions.h:920
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:83
int nbThreads()
Definition: Parallelizer.h:54
EIGEN_DEPRECATED void initParallel()
Definition: Parallelizer.h:50
ThreadPoolTempl< StlThreadEnvironment > ThreadPool
Definition: NonBlockingThreadPool.h:580
void setNbThreads(int v)
Definition: Parallelizer.h:62
void transpose()
Definition: skew_symmetric_matrix3.cpp:135
action
Definition: calibrate.py:47
Definition: Eigen_Colamd.h:49
Definition: Parallelizer.h:106
Definition: NonLinearOptimization.cpp:97
Definition: benchGeometry.cpp:21