10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
14 #include "./InternalHeaderCheck.h"
20 template <
typename Function,
typename... Args>
21 struct FunctionWrapperWithNotification {
30 template <
typename Function,
typename... Args>
31 struct FunctionWrapperWithBarrier {
40 template <
typename SyncType>
51 virtual void* allocate(
size_t num_bytes)
const = 0;
52 virtual void deallocate(
void* buffer)
const = 0;
56 struct ThreadPoolDevice {
58 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores,
Allocator* allocator =
nullptr)
59 : pool_(pool), num_threads_(num_cores), allocator_(allocator) {}
67 allocator_->deallocate(buffer);
73 EIGEN_STRONG_INLINE void* allocate_temp(
size_t num_bytes)
const {
return allocate(num_bytes); }
77 template <
typename Type>
84 ::memcpy(dst, src,
n);
90 const size_t kMinBlockSize = 32768;
91 const size_t num_threads = CostModel::numThreads(
n, TensorOpCost(1.0, 1.0, 0), 4);
92 if (
n <= kMinBlockSize || num_threads < 2) {
93 ::memcpy(dst, src,
n);
95 const char* src_ptr =
static_cast<const char*
>(src);
96 char* dst_ptr =
static_cast<char*
>(dst);
97 const size_t blocksize = (
n + (num_threads - 1)) / num_threads;
98 Barrier barrier(
static_cast<int>(num_threads - 1));
100 for (
size_t i = 1;
i < num_threads; ++
i) {
101 enqueue_with_barrier(&barrier, [
n,
i, src_ptr, dst_ptr, blocksize] {
102 ::memcpy(dst_ptr +
i * blocksize, src_ptr +
i * blocksize,
numext::mini(blocksize,
n - (
i * blocksize)));
106 ::memcpy(dst_ptr, src_ptr, blocksize);
111 EIGEN_STRONG_INLINE void memcpyHostToDevice(
void* dst,
const void* src,
size_t n)
const { memcpy(dst, src,
n); }
112 EIGEN_STRONG_INLINE void memcpyDeviceToHost(
void* dst,
const void* src,
size_t n)
const { memcpy(dst, src,
n); }
116 template <
typename T>
143 template <
class Function,
class... Args>
145 Notification*
n =
new Notification();
151 template <
class Function,
class... Args>
157 template <
class Function,
class... Args>
159 if (
sizeof...(
args) > 0) {
160 pool_->Schedule(std::bind(std::forward<Function>(
f),
args...));
162 pool_->Schedule(std::forward<Function>(
f));
177 void parallelFor(
Index n,
const TensorOpCost& cost, std::function<
Index(
Index)> block_align,
182 }
else if (
n == 1 || numThreads() == 1 || CostModel::numThreads(
n, cost,
static_cast<int>(numThreads())) == 1) {
188 ParallelForBlock
block = CalculateParallelForBlock(
n, cost, block_align);
193 Barrier barrier(
static_cast<unsigned int>(
block.count));
194 std::function<void(
Index,
Index)> handleRange;
195 handleRange = [
this,
block, &handleRange, &barrier, &
f](
Index firstIdx,
Index lastIdx) {
196 while (lastIdx - firstIdx >
block.size) {
199 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
203 f(firstIdx, lastIdx);
207 if (
block.count <= numThreads()) {
214 pool_->Schedule([=, &handleRange]() { handleRange(0,
n); });
221 void parallelFor(
Index n,
const TensorOpCost& cost, std::function<
void(
Index,
Index)>
f)
const {
222 parallelFor(
n, cost,
nullptr, std::move(
f));
232 void parallelForAsync(
Index n,
const TensorOpCost& cost, std::function<
Index(
Index)> block_align,
233 std::function<
void(
Index,
Index)>
f, std::function<
void()> done)
const {
235 if (
n <= 1 || numThreads() == 1 || CostModel::numThreads(
n, cost,
static_cast<int>(numThreads())) == 1) {
242 ParallelForBlock
block = CalculateParallelForBlock(
n, cost, block_align);
244 ParallelForAsyncContext*
const ctx =
new ParallelForAsyncContext(
block.count, std::move(
f), std::move(done));
249 ctx->handle_range = [
this, ctx,
block](
Index firstIdx,
Index lastIdx) {
250 while (lastIdx - firstIdx >
block.size) {
253 pool_->Schedule([ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
258 ctx->f(firstIdx, lastIdx);
261 if (ctx->count.fetch_sub(1) == 1)
delete ctx;
264 if (
block.count <= numThreads()) {
267 ctx->handle_range(0,
n);
271 pool_->Schedule([ctx,
n]() { ctx->handle_range(0,
n); });
276 void parallelForAsync(
Index n,
const TensorOpCost& cost, std::function<
void(
Index,
Index)>
f,
277 std::function<
void()> done)
const {
278 parallelForAsync(
n, cost,
nullptr, std::move(
f), std::move(done));
282 ThreadPoolInterface* getPool()
const {
return pool_; }
285 Allocator* allocator()
const {
return allocator_; }
288 typedef TensorCostModel<ThreadPoolDevice> CostModel;
292 struct ParallelForAsyncContext {
293 ParallelForAsyncContext(
Index block_count, std::function<
void(
Index,
Index)> block_f,
294 std::function<
void()> done_callback)
295 : count(block_count),
f(std::move(block_f)), done(std::move(done_callback)) {}
296 ~ParallelForAsyncContext() { done(); }
298 std::atomic<Index> count;
300 std::function<void()> done;
302 std::function<void(
Index,
Index)> handle_range;
305 struct ParallelForBlock {
315 ParallelForBlock CalculateParallelForBlock(
const Index n,
const TensorOpCost& cost,
316 std::function<
Index(
Index)> block_align)
const {
317 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
318 const Index max_oversharding_factor = 4;
320 n, numext::maxi<Index>(numext::div_ceil<Index>(
n, max_oversharding_factor * numThreads()), block_size_f));
324 Index new_block_size = block_align(block_size);
333 double max_efficiency =
334 static_cast<double>(block_count) / (numext::div_ceil<Index>(block_count, numThreads()) * numThreads());
338 for (
Index prev_block_count = block_count; max_efficiency < 1.0 && prev_block_count > 1;) {
343 Index new_block_size = block_align(coarser_block_size);
347 if (coarser_block_size > max_block_size) {
353 prev_block_count = coarser_block_count;
354 const double coarser_efficiency =
static_cast<double>(coarser_block_count) /
355 (numext::div_ceil<Index>(coarser_block_count, numThreads()) * numThreads());
356 if (coarser_efficiency + 0.01 >= max_efficiency) {
358 block_size = coarser_block_size;
359 block_count = coarser_block_count;
360 if (max_efficiency < coarser_efficiency) {
361 max_efficiency = coarser_efficiency;
366 return {block_size, block_count};
369 ThreadPoolInterface* pool_;
int i
Definition: BiCGSTAB_step_by_step.cpp:9
const unsigned n
Definition: CG3DPackingUnitTest.cpp:11
#define EIGEN_PREDICT_FALSE(x)
Definition: Macros.h:1179
#define EIGEN_DEVICE_FUNC
Definition: Macros.h:892
#define eigen_assert(x)
Definition: Macros.h:910
#define EIGEN_STRONG_INLINE
Definition: Macros.h:834
int data[]
Definition: Map_placement_new.cpp:1
m m block(1, 0, 2, 2)<< 4
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Scalar * b
Definition: benchVecAdd.cpp:17
Template argument; use a member class of CGFunctions to instantiate.
Concept for allocating, resizing and freeing memory block.
static int f(const TensorMap< Tensor< int, 3 > > &tensor)
Definition: cxx11_tensor_map.cpp:237
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
EIGEN_DEVICE_FUNC void * aligned_malloc(std::size_t size)
Definition: Memory.h:199
EIGEN_DEVICE_FUNC void aligned_free(void *ptr)
Definition: Memory.h:224
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE EIGEN_CONSTEXPR T div_ceil(T a, T b)
Definition: MathFunctions.h:1251
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
Definition: MathFunctions.h:920
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
auto run(Kernel kernel, Args &&... args) -> decltype(kernel(args...))
Definition: gpu_test_helper.h:414
squared absolute value
Definition: GlobalFunctions.h:87
std::ptrdiff_t l1CacheSize()
Definition: products/GeneralBlockPanelKernel.h:3119
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:83
std::ptrdiff_t l3CacheSize()
Definition: products/GeneralBlockPanelKernel.h:3135
Function
Definition: heat_transfer_and_melting/two_d_unsteady_heat_melt/animate.py:80
int c
Definition: calibrate.py:100
args
Definition: compute_granudrum_aor.py:143
Type
Type of JSON value.
Definition: rapidjson.h:513
Container::iterator get(Container &c, Position position)
Definition: stdlist_overload.cpp:29
void run(const string &dir_name, LinearSolver *linear_solver_pt, const unsigned nel_1d, bool mess_up_order)
Definition: two_d_poisson_compare_solvers.cc:317