TensorDeviceThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 // Runs an arbitrary function and then calls Notify() on the passed in
19 // Notification.
20 template <typename Function, typename... Args>
21 struct FunctionWrapperWithNotification {
22  static void run(Notification* n, Function f, Args... args) {
23  f(args...);
24  if (n) {
25  n->Notify();
26  }
27  }
28 };
29 
30 template <typename Function, typename... Args>
31 struct FunctionWrapperWithBarrier {
32  static void run(Barrier* b, Function f, Args... args) {
33  f(args...);
34  if (b) {
35  b->Notify();
36  }
37  }
38 };
39 
40 template <typename SyncType>
41 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
42  if (n) {
43  n->Wait();
44  }
45 }
46 
47 // An abstract interface to a device specific memory allocator.
48 class Allocator {
49  public:
50  virtual ~Allocator() {}
51  virtual void* allocate(size_t num_bytes) const = 0;
52  virtual void deallocate(void* buffer) const = 0;
53 };
54 
55 // Build a thread pool device on top the an existing pool of threads.
56 struct ThreadPoolDevice {
57  // The ownership of the thread pool remains with the caller.
58  ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
59  : pool_(pool), num_threads_(num_cores), allocator_(allocator) {}
60 
61  EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
62  return allocator_ ? allocator_->allocate(num_bytes) : internal::aligned_malloc(num_bytes);
63  }
64 
65  EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
66  if (allocator_) {
67  allocator_->deallocate(buffer);
68  } else {
69  internal::aligned_free(buffer);
70  }
71  }
72 
73  EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const { return allocate(num_bytes); }
74 
75  EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { deallocate(buffer); }
76 
77  template <typename Type>
79  return data;
80  }
81 
82  EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
83 #ifdef __ANDROID__
84  ::memcpy(dst, src, n);
85 #else
86  // TODO(rmlarsen): Align blocks on cache lines.
87  // We have observed that going beyond 4 threads usually just wastes
88  // CPU cycles due to the threads competing for memory bandwidth, so we
89  // statically schedule at most 4 block copies here.
90  const size_t kMinBlockSize = 32768;
91  const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
92  if (n <= kMinBlockSize || num_threads < 2) {
93  ::memcpy(dst, src, n);
94  } else {
95  const char* src_ptr = static_cast<const char*>(src);
96  char* dst_ptr = static_cast<char*>(dst);
97  const size_t blocksize = (n + (num_threads - 1)) / num_threads;
98  Barrier barrier(static_cast<int>(num_threads - 1));
99  // Launch the last 3 blocks on worker threads.
100  for (size_t i = 1; i < num_threads; ++i) {
101  enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
102  ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize)));
103  });
104  }
105  // Launch the first block on the main thread.
106  ::memcpy(dst_ptr, src_ptr, blocksize);
107  barrier.Wait();
108  }
109 #endif
110  }
111  EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
112  EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
113 
114  EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { ::memset(buffer, c, n); }
115 
116  template <typename T>
117  EIGEN_STRONG_INLINE void fill(T* begin, T* end, const T& value) const {
118  std::fill(begin, end, value);
119  }
120 
121  EIGEN_STRONG_INLINE int numThreads() const { return num_threads_; }
122 
123  // Number of theads available in the underlying thread pool. This number can
124  // be different from the value returned by numThreads().
125  EIGEN_STRONG_INLINE int numThreadsInPool() const { return pool_->NumThreads(); }
126 
127  EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { return l1CacheSize(); }
128 
129  EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
130  // The l3 cache size is shared between all the cores.
131  return l3CacheSize() / num_threads_;
132  }
133 
134  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const {
135  // Nothing. Threadpool device operations are synchronous.
136  }
137 
138  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
139  // Should return an enum that encodes the ISA supported by the CPU
140  return 1;
141  }
142 
143  template <class Function, class... Args>
144  EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
145  Notification* n = new Notification();
146  pool_->Schedule(
147  std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, std::forward<Function>(f), args...));
148  return n;
149  }
150 
151  template <class Function, class... Args>
152  EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f, Args&&... args) const {
153  pool_->Schedule(
154  std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b, std::forward<Function>(f), args...));
155  }
156 
157  template <class Function, class... Args>
158  EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
159  if (sizeof...(args) > 0) {
160  pool_->Schedule(std::bind(std::forward<Function>(f), args...));
161  } else {
162  pool_->Schedule(std::forward<Function>(f));
163  }
164  }
165 
166  // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
167  // called from one of the threads in pool_. Returns -1 otherwise.
168  EIGEN_STRONG_INLINE int currentThreadId() const { return pool_->CurrentThreadId(); }
169 
170  // WARNING: This function is synchronous and will block the calling thread.
171  //
172  // Synchronous parallelFor executes f with [0, n) arguments in parallel and
173  // waits for completion. F accepts a half-open interval [first, last). Block
174  // size is chosen based on the iteration cost and resulting parallel
175  // efficiency. If block_align is not nullptr, it is called to round up the
176  // block size.
177  void parallelFor(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
178  std::function<void(Index, Index)> f) const {
179  if (EIGEN_PREDICT_FALSE(n <= 0)) {
180  return;
181  // Compute small problems directly in the caller thread.
182  } else if (n == 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
183  f(0, n);
184  return;
185  }
186 
187  // Compute block size and total count of blocks.
188  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
189 
190  // Recursively divide size into halves until we reach block_size.
191  // Division code rounds mid to block_size, so we are guaranteed to get
192  // block_count leaves that do actual computations.
193  Barrier barrier(static_cast<unsigned int>(block.count));
194  std::function<void(Index, Index)> handleRange;
195  handleRange = [this, block, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
196  while (lastIdx - firstIdx > block.size) {
197  // Split into halves and schedule the second half on a different thread.
198  const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
199  pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
200  lastIdx = midIdx;
201  }
202  // Single block or less, execute directly.
203  f(firstIdx, lastIdx);
204  barrier.Notify();
205  };
206 
207  if (block.count <= numThreads()) {
208  // Avoid a thread hop by running the root of the tree and one block on the
209  // main thread.
210  handleRange(0, n);
211  } else {
212  // Execute the root in the thread pool to avoid running work on more than
213  // numThreads() threads.
214  pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
215  }
216 
217  barrier.Wait();
218  }
219 
220  // Convenience wrapper for parallelFor that does not align blocks.
221  void parallelFor(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f) const {
222  parallelFor(n, cost, nullptr, std::move(f));
223  }
224 
225  // WARNING: This function is asynchronous and will not block the calling thread.
226  //
227  // Asynchronous parallelFor executes f with [0, n) arguments in parallel
228  // without waiting for completion. When the last block finished, it will call
229  // 'done' callback. F accepts a half-open interval [first, last). Block size
230  // is chosen based on the iteration cost and resulting parallel efficiency. If
231  // block_align is not nullptr, it is called to round up the block size.
232  void parallelForAsync(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
233  std::function<void(Index, Index)> f, std::function<void()> done) const {
234  // Compute small problems directly in the caller thread.
235  if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
236  f(0, n);
237  done();
238  return;
239  }
240 
241  // Compute block size and total count of blocks.
242  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
243 
244  ParallelForAsyncContext* const ctx = new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
245 
246  // Recursively divide size into halves until we reach block_size.
247  // Division code rounds mid to block_size, so we are guaranteed to get
248  // block_count leaves that do actual computations.
249  ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
250  while (lastIdx - firstIdx > block.size) {
251  // Split into halves and schedule the second half on a different thread.
252  const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
253  pool_->Schedule([ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
254  lastIdx = midIdx;
255  }
256 
257  // Single block or less, execute directly.
258  ctx->f(firstIdx, lastIdx);
259 
260  // Delete async context if it was the last block.
261  if (ctx->count.fetch_sub(1) == 1) delete ctx;
262  };
263 
264  if (block.count <= numThreads()) {
265  // Avoid a thread hop by running the root of the tree and one block on the
266  // main thread.
267  ctx->handle_range(0, n);
268  } else {
269  // Execute the root in the thread pool to avoid running work on more than
270  // numThreads() threads.
271  pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
272  }
273  }
274 
275  // Convenience wrapper for parallelForAsync that does not align blocks.
276  void parallelForAsync(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f,
277  std::function<void()> done) const {
278  parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
279  }
280 
281  // Thread pool accessor.
282  ThreadPoolInterface* getPool() const { return pool_; }
283 
284  // Allocator accessor.
285  Allocator* allocator() const { return allocator_; }
286 
287  private:
288  typedef TensorCostModel<ThreadPoolDevice> CostModel;
289 
290  // For parallelForAsync we must keep passed in closures on the heap, and
291  // delete them only after `done` callback finished.
292  struct ParallelForAsyncContext {
293  ParallelForAsyncContext(Index block_count, std::function<void(Index, Index)> block_f,
294  std::function<void()> done_callback)
295  : count(block_count), f(std::move(block_f)), done(std::move(done_callback)) {}
296  ~ParallelForAsyncContext() { done(); }
297 
298  std::atomic<Index> count;
299  std::function<void(Index, Index)> f;
300  std::function<void()> done;
301 
302  std::function<void(Index, Index)> handle_range;
303  };
304 
305  struct ParallelForBlock {
306  Index size; // block size
307  Index count; // number of blocks
308  };
309 
310  // Calculates block size based on (1) the iteration cost and (2) parallel
311  // efficiency. We want blocks to be not too small to mitigate parallelization
312  // overheads; not too large to mitigate tail effect and potential load
313  // imbalance and we also want number of blocks to be evenly dividable across
314  // threads.
315  ParallelForBlock CalculateParallelForBlock(const Index n, const TensorOpCost& cost,
316  std::function<Index(Index)> block_align) const {
317  const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
318  const Index max_oversharding_factor = 4;
319  Index block_size = numext::mini(
320  n, numext::maxi<Index>(numext::div_ceil<Index>(n, max_oversharding_factor * numThreads()), block_size_f));
321  const Index max_block_size = numext::mini(n, 2 * block_size);
322 
323  if (block_align) {
324  Index new_block_size = block_align(block_size);
325  eigen_assert(new_block_size >= block_size);
326  block_size = numext::mini(n, new_block_size);
327  }
328 
329  Index block_count = numext::div_ceil(n, block_size);
330 
331  // Calculate parallel efficiency as fraction of total CPU time used for
332  // computations:
333  double max_efficiency =
334  static_cast<double>(block_count) / (numext::div_ceil<Index>(block_count, numThreads()) * numThreads());
335 
336  // Now try to increase block size up to max_block_size as long as it
337  // doesn't decrease parallel efficiency.
338  for (Index prev_block_count = block_count; max_efficiency < 1.0 && prev_block_count > 1;) {
339  // This is the next block size that divides size into a smaller number
340  // of blocks than the current block_size.
341  Index coarser_block_size = numext::div_ceil(n, prev_block_count - 1);
342  if (block_align) {
343  Index new_block_size = block_align(coarser_block_size);
344  eigen_assert(new_block_size >= coarser_block_size);
345  coarser_block_size = numext::mini(n, new_block_size);
346  }
347  if (coarser_block_size > max_block_size) {
348  break; // Reached max block size. Stop.
349  }
350  // Recalculate parallel efficiency.
351  const Index coarser_block_count = numext::div_ceil(n, coarser_block_size);
352  eigen_assert(coarser_block_count < prev_block_count);
353  prev_block_count = coarser_block_count;
354  const double coarser_efficiency = static_cast<double>(coarser_block_count) /
355  (numext::div_ceil<Index>(coarser_block_count, numThreads()) * numThreads());
356  if (coarser_efficiency + 0.01 >= max_efficiency) {
357  // Taking it.
358  block_size = coarser_block_size;
359  block_count = coarser_block_count;
360  if (max_efficiency < coarser_efficiency) {
361  max_efficiency = coarser_efficiency;
362  }
363  }
364  }
365 
366  return {block_size, block_count};
367  }
368 
369  ThreadPoolInterface* pool_;
370  int num_threads_;
371  Allocator* allocator_;
372 };
373 
374 } // end namespace Eigen
375 
376 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
int i
Definition: BiCGSTAB_step_by_step.cpp:9
const unsigned n
Definition: CG3DPackingUnitTest.cpp:11
#define EIGEN_PREDICT_FALSE(x)
Definition: Macros.h:1179
#define EIGEN_DEVICE_FUNC
Definition: Macros.h:892
#define eigen_assert(x)
Definition: Macros.h:910
#define EIGEN_STRONG_INLINE
Definition: Macros.h:834
int data[]
Definition: Map_placement_new.cpp:1
m m block(1, 0, 2, 2)<< 4
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Scalar * b
Definition: benchVecAdd.cpp:17
Template argument; use a member class of CGFunctions to instantiate.
Concept for allocating, resizing and freeing memory block.
static int f(const TensorMap< Tensor< int, 3 > > &tensor)
Definition: cxx11_tensor_map.cpp:237
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
EIGEN_DEVICE_FUNC void * aligned_malloc(std::size_t size)
Definition: Memory.h:199
EIGEN_DEVICE_FUNC void aligned_free(void *ptr)
Definition: Memory.h:224
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE EIGEN_CONSTEXPR T div_ceil(T a, T b)
Definition: MathFunctions.h:1251
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
Definition: MathFunctions.h:920
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
auto run(Kernel kernel, Args &&... args) -> decltype(kernel(args...))
Definition: gpu_test_helper.h:414
squared absolute value
Definition: GlobalFunctions.h:87
std::ptrdiff_t l1CacheSize()
Definition: products/GeneralBlockPanelKernel.h:3119
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:83
std::ptrdiff_t l3CacheSize()
Definition: products/GeneralBlockPanelKernel.h:3135
Function
Definition: heat_transfer_and_melting/two_d_unsteady_heat_melt/animate.py:80
int c
Definition: calibrate.py:100
args
Definition: compute_granudrum_aor.py:143
Type
Type of JSON value.
Definition: rapidjson.h:513
Container::iterator get(Container &c, Position position)
Definition: stdlist_overload.cpp:29
void run(const string &dir_name, LinearSolver *linear_solver_pt, const unsigned nel_1d, bool mess_up_order)
Definition: two_d_poisson_compare_solvers.cc:317