ForkJoin.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2025 Weiwei Kong <weiweikong@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_THREADPOOL_FORKJOIN_H
11 #define EIGEN_THREADPOOL_FORKJOIN_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 // ForkJoinScheduler provides implementations of various non-blocking ParallelFor algorithms for unary
19 // and binary parallel tasks. More specfically, the implementations follow the binary tree-based
20 // algorithm from the following paper:
21 //
22 // Lea, D. (2000, June). A java fork/join framework. *In Proceedings of the
23 // ACM 2000 conference on Java Grande* (pp. 36-43).
24 //
25 // For a given binary task function `f(i,j)` and integers `num_threads`, `granularity`, `start`, and `end`,
26 // the implemented parallel for algorithm schedules and excutes at most `num_threads` of the functions
27 // from the following set in parallel (either synchronously or asynchronously):
28 //
29 // f(start,start+s_1), f(start+s_1,start+s_2), ..., f(start+s_n,end)
30 //
31 // where `s_{j+1} - s_{j}` and `end - s_n` are roughly within a factor of two of `granularity`. For a unary
32 // task function `g(k)`, the same operation is applied with
33 //
34 // f(i,j) = [&](){ for(int k = i; k < j; ++k) g(k); };
35 //
36 // Note that the parameter `granularity` should be tuned by the user based on the trade-off of running the
37 // given task function sequentially vs. scheduling individual tasks in parallel. An example of a partially
38 // tuned `granularity` is in `Eigen::CoreThreadPoolDevice::parallelFor(...)` where the template
39 // parameter `PacketSize` and float input `cost` are used to indirectly compute a granularity level for a
40 // given task function.
41 //
42 // Example usage #1 (synchronous):
43 // ```
44 // ThreadPool thread_pool(num_threads);
45 // ForkJoinScheduler::ParallelFor(0, num_tasks, granularity, std::move(parallel_task), &thread_pool);
46 // ```
47 //
48 // Example usage #2 (asynchronous):
49 // ```
50 // ThreadPool thread_pool(num_threads);
51 // Barrier barrier(num_tasks * num_async_calls);
52 // auto done = [&](){barrier.Notify();};
53 // for (int k=0; k<num_async_calls; ++k) {
54 // thread_pool.Schedule([&](){
55 // ForkJoinScheduler::ParallelForAsync(0, num_tasks, granularity, parallel_task, done, &thread_pool);
56 // });
57 // }
58 // barrier.Wait();
59 // ```
61  public:
62  // Runs `do_func` asynchronously for the range [start, end) with a specified granularity. `do_func` should
63  // either be of type `std::function<void(int)>` or `std::function<void(int, int)`.
64  // If `end > start`, the `done` callback will be called `end - start` times when all tasks have been
65  // executed. Otherwise, `done` is called only once.
66  template <typename DoFnType>
67  static void ParallelForAsync(int start, int end, int granularity, DoFnType do_func, std::function<void()> done,
68  Eigen::ThreadPool* thread_pool) {
69  if (start >= end) {
70  done();
71  return;
72  }
73  ForkJoinScheduler::RunParallelForAsync(start, end, granularity, do_func, done, thread_pool);
74  }
75 
76  // Synchronous variant of ParallelForAsync.
77  template <typename DoFnType>
78  static void ParallelFor(int start, int end, int granularity, DoFnType do_func, Eigen::ThreadPool* thread_pool) {
79  if (start >= end) return;
80  auto dummy_done = []() {};
81  Barrier barrier(1);
82  thread_pool->Schedule([start, end, granularity, thread_pool, &do_func, &dummy_done, &barrier]() {
83  ForkJoinScheduler::ParallelForAsync(start, end, granularity, do_func, dummy_done, thread_pool);
84  barrier.Notify();
85  });
86  barrier.Wait();
87  }
88 
89  private:
90  // Schedules `right_thunk`, runs `left_thunk`, and runs other tasks until `right_thunk` has finished.
91  template <typename LeftType, typename RightType>
92  static void ForkJoin(LeftType&& left_thunk, RightType&& right_thunk, Eigen::ThreadPool* thread_pool) {
93  std::atomic<bool> right_done(false);
94  auto execute_right = [&right_thunk, &right_done]() {
95  std::forward<RightType>(right_thunk)();
96  right_done.store(true, std::memory_order_release);
97  };
98  thread_pool->Schedule(execute_right);
99  std::forward<LeftType>(left_thunk)();
101  while (!right_done.load(std::memory_order_acquire)) {
102  thread_pool->MaybeGetTask(&task);
103  if (task.f) task.f();
104  }
105  }
106 
107  // Runs `do_func` in parallel for the range [start, end). The main recursive asynchronous runner that
108  // calls `ForkJoin`.
109  static void RunParallelForAsync(int start, int end, int granularity, std::function<void(int)>& do_func,
110  std::function<void()>& done, Eigen::ThreadPool* thread_pool) {
111  std::function<void(int, int)> wrapped_do_func = [&do_func](int start, int end) {
112  for (int i = start; i < end; ++i) do_func(i);
113  };
114  ForkJoinScheduler::RunParallelForAsync(start, end, granularity, wrapped_do_func, done, thread_pool);
115  }
116 
117  // Variant of `RunAsyncParallelFor` that uses a do function that operates on an index range.
118  // Specifically, `do_func` takes two arguments: the start and end of the range.
119  static void RunParallelForAsync(int start, int end, int granularity, std::function<void(int, int)>& do_func,
120  std::function<void()>& done, Eigen::ThreadPool* thread_pool) {
121  if ((end - start) <= granularity) {
122  do_func(start, end);
123  for (int j = 0; j < end - start; ++j) done();
124  } else {
125  // Typical workloads choose initial values of `{start, end, granularity}` such that `start - end` and
126  // `granularity` are powers of two. Since modern processors usually implement (2^x)-way
127  // set-associative caches, we minimize the number of cache misses by choosing midpoints that are not
128  // powers of two (to avoid having two addresses in the main memory pointing to the same point in the
129  // cache). More specifically, we restrict the set of candidate midpoints to:
130  //
131  // P := {start, start + granularity, start + 2*granularity, ..., end},
132  //
133  // and choose the entry in `P` at (roughly) the 9/16 mark.
134  const int size = end - start;
135  const int mid = start + Eigen::numext::div_ceil(9 * (size + 1) / 16, granularity) * granularity;
137  [start, mid, granularity, &do_func, &done, thread_pool]() {
138  RunParallelForAsync(start, mid, granularity, do_func, done, thread_pool);
139  },
140  [mid, end, granularity, &do_func, &done, thread_pool]() {
141  RunParallelForAsync(mid, end, granularity, do_func, done, thread_pool);
142  },
143  thread_pool);
144  }
145  }
146 };
147 
148 } // namespace Eigen
149 
150 #endif // EIGEN_THREADPOOL_FORKJOIN_H
int i
Definition: BiCGSTAB_step_by_step.cpp:9
Scalar Scalar int size
Definition: benchVecAdd.cpp:17
Definition: Barrier.h:21
void Wait()
Definition: Barrier.h:43
void Notify()
Definition: Barrier.h:28
Definition: ForkJoin.h:60
static void ParallelFor(int start, int end, int granularity, DoFnType do_func, Eigen::ThreadPool *thread_pool)
Definition: ForkJoin.h:78
static void ParallelForAsync(int start, int end, int granularity, DoFnType do_func, std::function< void()> done, Eigen::ThreadPool *thread_pool)
Definition: ForkJoin.h:67
static void RunParallelForAsync(int start, int end, int granularity, std::function< void(int, int)> &do_func, std::function< void()> &done, Eigen::ThreadPool *thread_pool)
Definition: ForkJoin.h:119
static void RunParallelForAsync(int start, int end, int granularity, std::function< void(int)> &do_func, std::function< void()> &done, Eigen::ThreadPool *thread_pool)
Definition: ForkJoin.h:109
static void ForkJoin(LeftType &&left_thunk, RightType &&right_thunk, Eigen::ThreadPool *thread_pool)
Definition: ForkJoin.h:92
Definition: NonBlockingThreadPool.h:19
Environment::Task Task
Definition: NonBlockingThreadPool.h:22
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
Definition: NonBlockingThreadPool.h:120
void MaybeGetTask(Task *t)
Definition: NonBlockingThreadPool.h:157
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE EIGEN_CONSTEXPR T div_ceil(T a, T b)
Definition: MathFunctions.h:1251
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
void start(const unsigned &i)
(Re-)start i-th timer
Definition: oomph_utilities.cc:243
std::ptrdiff_t j
Definition: tut_arithmetic_redux_minmax.cpp:2