threads_non_blocking_thread_pool.cpp File Reference
#include "main.h"
#include "Eigen/ThreadPool"

Macros

#define EIGEN_USE_THREADS
 

Functions

static void test_create_destroy_empty_pool ()
 
static void test_parallelism (bool allow_spinning)
 
static void test_cancel ()
 
static void test_pool_partitions ()
 
 EIGEN_DECLARE_TEST (cxx11_non_blocking_thread_pool)
 

Macro Definition Documentation

◆ EIGEN_USE_THREADS

#define EIGEN_USE_THREADS

Function Documentation

◆ EIGEN_DECLARE_TEST()

EIGEN_DECLARE_TEST ( cxx11_non_blocking_thread_pool  )
167  {
173 }
#define CALL_SUBTEST(FUNC)
Definition: main.h:382
static void test_cancel()
Definition: threads_non_blocking_thread_pool.cpp:100
static void test_pool_partitions()
Definition: threads_non_blocking_thread_pool.cpp:113
static void test_create_destroy_empty_pool()
Definition: threads_non_blocking_thread_pool.cpp:15
static void test_parallelism(bool allow_spinning)
Definition: threads_non_blocking_thread_pool.cpp:23

References CALL_SUBTEST, test_cancel(), test_create_destroy_empty_pool(), test_parallelism(), and test_pool_partitions().

◆ test_cancel()

static void test_cancel ( )
static
100  {
101  ThreadPool tp(2);
102 
103  // Schedule a large number of closure that each sleeps for one second. This
104  // will keep the thread pool busy for much longer than the default test timeout.
105  for (int i = 0; i < 1000; ++i) {
106  tp.Schedule([]() { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); });
107  }
108 
109  // Cancel the processing of all the closures that are still pending.
110  tp.Cancel();
111 }
int i
Definition: BiCGSTAB_step_by_step.cpp:9
Definition: NonBlockingThreadPool.h:19

References Eigen::ThreadPoolTempl< Environment >::Cancel(), i, and Eigen::ThreadPoolTempl< Environment >::Schedule().

Referenced by EIGEN_DECLARE_TEST().

◆ test_create_destroy_empty_pool()

static void test_create_destroy_empty_pool ( )
static
15  {
16  // Just create and destroy the pool. This will wind up and tear down worker
17  // threads. Ensure there are no issues in that logic.
18  for (int i = 0; i < 16; ++i) {
19  ThreadPool tp(i);
20  }
21 }

References i.

Referenced by EIGEN_DECLARE_TEST().

◆ test_parallelism()

static void test_parallelism ( bool  allow_spinning)
static
23  {
24  // Test we never-ever fail to match available tasks with idle threads.
25  const int kThreads = 16; // code below expects that this is a multiple of 4
26  ThreadPool tp(kThreads, allow_spinning);
27  VERIFY_IS_EQUAL(tp.NumThreads(), kThreads);
28  VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1);
29  for (int iter = 0; iter < 100; ++iter) {
30  std::atomic<int> running(0);
31  std::atomic<int> done(0);
32  std::atomic<int> phase(0);
33  // Schedule kThreads tasks and ensure that they all are running.
34  for (int i = 0; i < kThreads; ++i) {
35  tp.Schedule([&]() {
36  const int thread_id = tp.CurrentThreadId();
37  VERIFY_GE(thread_id, 0);
38  VERIFY_LE(thread_id, kThreads - 1);
39  running++;
40  while (phase < 1) {
41  }
42  done++;
43  });
44  }
45  while (running != kThreads) {
46  }
47  running = 0;
48  phase = 1;
49  // Now, while the previous tasks exit, schedule another kThreads tasks and
50  // ensure that they are running.
51  for (int i = 0; i < kThreads; ++i) {
52  tp.Schedule([&, i]() {
53  running++;
54  while (phase < 2) {
55  }
56  // When all tasks are running, half of tasks exit, quarter of tasks
57  // continue running and quarter of tasks schedule another 2 tasks each.
58  // Concurrently main thread schedules another quarter of tasks.
59  // This gives us another kThreads tasks and we ensure that they all
60  // are running.
61  if (i < kThreads / 2) {
62  } else if (i < 3 * kThreads / 4) {
63  running++;
64  while (phase < 3) {
65  }
66  done++;
67  } else {
68  for (int j = 0; j < 2; ++j) {
69  tp.Schedule([&]() {
70  running++;
71  while (phase < 3) {
72  }
73  done++;
74  });
75  }
76  }
77  done++;
78  });
79  }
80  while (running != kThreads) {
81  }
82  running = 0;
83  phase = 2;
84  for (int i = 0; i < kThreads / 4; ++i) {
85  tp.Schedule([&]() {
86  running++;
87  while (phase < 3) {
88  }
89  done++;
90  });
91  }
92  while (running != kThreads) {
93  }
94  phase = 3;
95  while (done != 3 * kThreads) {
96  }
97  }
98 }
#define VERIFY_GE(a, b)
Definition: main.h:364
#define VERIFY_IS_EQUAL(a, b)
Definition: main.h:367
#define VERIFY_LE(a, b)
Definition: main.h:365
std::ptrdiff_t j
Definition: tut_arithmetic_redux_minmax.cpp:2

References Eigen::ThreadPoolTempl< Environment >::CurrentThreadId(), i, j, Eigen::ThreadPoolTempl< Environment >::NumThreads(), Eigen::ThreadPoolTempl< Environment >::Schedule(), VERIFY_GE, VERIFY_IS_EQUAL, and VERIFY_LE.

Referenced by EIGEN_DECLARE_TEST().

◆ test_pool_partitions()

static void test_pool_partitions ( )
static
113  {
114  const int kThreads = 2;
115 
116  std::atomic<int> running(0);
117  std::atomic<int> done(0);
118  std::atomic<int> phase(0);
119 
120  {
121  ThreadPool tp(kThreads);
122 
123  // Assign each thread to its own partition, so that stealing other work only
124  // occurs globally when a thread is idle.
125  std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads);
126  for (int i = 0; i < kThreads; ++i) {
127  steal_partitions[i] = std::make_pair(i, i + 1);
128  }
129  tp.SetStealPartitions(steal_partitions);
130 
131  // Schedule kThreads tasks and ensure that they all are running.
132  for (int i = 0; i < kThreads; ++i) {
133  tp.Schedule([&]() {
134  const int thread_id = tp.CurrentThreadId();
135  VERIFY_GE(thread_id, 0);
136  VERIFY_LE(thread_id, kThreads - 1);
137  ++running;
138  while (phase < 1) {
139  }
140  ++done;
141  });
142  }
143  while (running != kThreads) {
144  }
145  // Schedule each closure to only run on thread 'i' and verify that it does.
146  for (int i = 0; i < kThreads; ++i) {
147  tp.ScheduleWithHint(
148  [&, i]() {
149  ++running;
150  const int thread_id = tp.CurrentThreadId();
151  VERIFY_IS_EQUAL(thread_id, i);
152  while (phase < 2) {
153  }
154  ++done;
155  },
156  i, i + 1);
157  }
158  running = 0;
159  phase = 1;
160  while (running != kThreads) {
161  }
162  running = 0;
163  phase = 2;
164  }
165 }

References Eigen::ThreadPoolTempl< Environment >::CurrentThreadId(), i, Eigen::ThreadPoolTempl< Environment >::Schedule(), Eigen::ThreadPoolTempl< Environment >::ScheduleWithHint(), Eigen::ThreadPoolTempl< Environment >::SetStealPartitions(), VERIFY_GE, VERIFY_IS_EQUAL, and VERIFY_LE.

Referenced by EIGEN_DECLARE_TEST().