ThreadLocal.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
11 #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
12 
13 #ifdef EIGEN_AVOID_THREAD_LOCAL
14 
15 #ifdef EIGEN_THREAD_LOCAL
16 #undef EIGEN_THREAD_LOCAL
17 #endif
18 
19 #else
20 
21 #if ((EIGEN_COMP_GNUC) || __has_feature(cxx_thread_local) || EIGEN_COMP_MSVC)
22 #define EIGEN_THREAD_LOCAL static thread_local
23 #endif
24 
25 // Disable TLS for Apple and Android builds with older toolchains.
26 #if defined(__APPLE__)
27 // Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
28 // __IPHONE_8_0.
29 #include <Availability.h>
30 #include <TargetConditionals.h>
31 #endif
32 // Checks whether C++11's `thread_local` storage duration specifier is
33 // supported.
34 #if EIGEN_COMP_CLANGAPPLE && \
35  ((EIGEN_COMP_CLANGAPPLE < 8000042) || (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
36 // Notes: Xcode's clang did not support `thread_local` until version
37 // 8, and even then not for all iOS < 9.0.
38 #undef EIGEN_THREAD_LOCAL
39 
40 #elif defined(__ANDROID__) && EIGEN_COMP_CLANG
41 // There are platforms for which TLS should not be used even though the compiler
42 // makes it seem like it's supported (Android NDK < r12b for example).
43 // This is primarily because of linker problems and toolchain misconfiguration:
44 // TLS isn't supported until NDK r12b per
45 // https://developer.android.com/ndk/downloads/revision_history.html
46 // Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
47 // <android/ndk-version.h>. For NDK < r16, users should define these macros,
48 // e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
49 #if __has_include(<android/ndk-version.h>)
50 #include <android/ndk-version.h>
51 #endif // __has_include(<android/ndk-version.h>)
52 #if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && defined(__NDK_MINOR__) && \
53  ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
54 #undef EIGEN_THREAD_LOCAL
55 #endif
56 #endif // defined(__ANDROID__) && defined(__clang__)
57 
58 #endif // EIGEN_AVOID_THREAD_LOCAL
59 
60 // IWYU pragma: private
61 #include "./InternalHeaderCheck.h"
62 
63 namespace Eigen {
64 
65 namespace internal {
66 template <typename T>
68  void operator()(T&) const {}
69 };
70 
71 template <typename T>
73  void operator()(T&) const {}
74 };
75 
76 } // namespace internal
77 
78 // Thread local container for elements of type T, that does not use thread local
79 // storage. As long as the number of unique threads accessing this storage
80 // is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will
81 // use a mutex for synchronization.
82 //
83 // Type `T` has to be default constructible, and by default each thread will get
84 // a default constructed value. It is possible to specify custom `initialize`
85 // callable, that will be called lazily from each thread accessing this object,
86 // and will be passed a default initialized object of type `T`. Also it's
87 // possible to pass a custom `release` callable, that will be invoked before
88 // calling ~T().
89 //
90 // Example:
91 //
92 // struct Counter {
93 // int value = 0;
94 // }
95 //
96 // Eigen::ThreadLocal<Counter> counter(10);
97 //
98 // // Each thread will have access to it's own counter object.
99 // Counter& cnt = counter.local();
100 // cnt++;
101 //
102 // WARNING: Eigen::ThreadLocal uses the OS-specific value returned by
103 // std::this_thread::get_id() to identify threads. This value is not guaranteed
104 // to be unique except for the life of the thread. A newly created thread may
105 // get an OS-specific ID equal to that of an already destroyed thread.
106 //
107 // Somewhat similar to TBB thread local storage, with similar restrictions:
108 // https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html
109 //
110 template <typename T, typename Initialize = internal::ThreadLocalNoOpInitialize<T>,
111  typename Release = internal::ThreadLocalNoOpRelease<T>>
112 class ThreadLocal {
113  // We preallocate default constructed elements in MaxSizedVector.
114  static_assert(std::is_default_constructible<T>::value, "ThreadLocal data type must be default constructible");
115 
116  public:
117  explicit ThreadLocal(int capacity)
118  : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(), internal::ThreadLocalNoOpRelease<T>()) {}
119 
120  ThreadLocal(int capacity, Initialize initialize)
121  : ThreadLocal(capacity, std::move(initialize), internal::ThreadLocalNoOpRelease<T>()) {}
122 
123  ThreadLocal(int capacity, Initialize initialize, Release release)
124  : initialize_(std::move(initialize)),
125  release_(std::move(release)),
126  capacity_(capacity),
127  data_(capacity_),
128  ptr_(capacity_),
129  filled_records_(0) {
130  eigen_assert(capacity_ >= 0);
131  data_.resize(capacity_);
132  for (int i = 0; i < capacity_; ++i) {
133  ptr_.emplace_back(nullptr);
134  }
135  }
136 
137  T& local() {
138  std::thread::id this_thread = std::this_thread::get_id();
139  if (capacity_ == 0) return SpilledLocal(this_thread);
140 
141  std::size_t h = std::hash<std::thread::id>()(this_thread);
142  const int start_idx = h % capacity_;
143 
144  // NOTE: From the definition of `std::this_thread::get_id()` it is
145  // guaranteed that we never can have concurrent insertions with the same key
146  // to our hash-map like data structure. If we didn't find an element during
147  // the initial traversal, it's guaranteed that no one else could have
148  // inserted it while we are in this function. This allows to massively
149  // simplify out lock-free insert-only hash map.
150 
151  // Check if we already have an element for `this_thread`.
152  int idx = start_idx;
153  while (ptr_[idx].load() != nullptr) {
154  ThreadIdAndValue& record = *(ptr_[idx].load());
155  if (record.thread_id == this_thread) return record.value;
156 
157  idx += 1;
158  if (idx >= capacity_) idx -= capacity_;
159  if (idx == start_idx) break;
160  }
161 
162  // If we are here, it means that we found an insertion point in lookup
163  // table at `idx`, or we did a full traversal and table is full.
164 
165  // If lock-free storage is full, fallback on mutex.
166  if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
167 
168  // We double check that we still have space to insert an element into a lock
169  // free storage. If old value in `filled_records_` is larger than the
170  // records capacity, it means that some other thread added an element while
171  // we were traversing lookup table.
172  int insertion_index = filled_records_.fetch_add(1, std::memory_order_relaxed);
173  if (insertion_index >= capacity_) return SpilledLocal(this_thread);
174 
175  // At this point it's guaranteed that we can access to
176  // data_[insertion_index_] without a data race.
177  data_[insertion_index].thread_id = this_thread;
178  initialize_(data_[insertion_index].value);
179 
180  // That's the pointer we'll put into the lookup table.
181  ThreadIdAndValue* inserted = &data_[insertion_index];
182 
183  // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
184  ThreadIdAndValue* empty = nullptr;
185 
186  // Now we have to find an insertion point into the lookup table. We start
187  // from the `idx` that was identified as an insertion point above, it's
188  // guaranteed that we will have an empty record somewhere in a lookup table
189  // (because we created a record in the `data_`).
190  const int insertion_idx = idx;
191 
192  do {
193  // Always start search from the original insertion candidate.
194  idx = insertion_idx;
195  while (ptr_[idx].load() != nullptr) {
196  idx += 1;
197  if (idx >= capacity_) idx -= capacity_;
198  // If we did a full loop, it means that we don't have any free entries
199  // in the lookup table, and this means that something is terribly wrong.
200  eigen_assert(idx != insertion_idx);
201  }
202  // Atomic CAS of the pointer guarantees that any other thread, that will
203  // follow this pointer will see all the mutations in the `data_`.
204  } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
205 
206  return inserted->value;
207  }
208 
209  // WARN: It's not thread safe to call it concurrently with `local()`.
210  void ForEach(std::function<void(std::thread::id, T&)> f) {
211  // Reading directly from `data_` is unsafe, because only CAS to the
212  // record in `ptr_` makes all changes visible to other threads.
213  for (auto& ptr : ptr_) {
214  ThreadIdAndValue* record = ptr.load();
215  if (record == nullptr) continue;
216  f(record->thread_id, record->value);
217  }
218 
219  // We did not spill into the map based storage.
220  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
221 
222  // Adds a happens before edge from the last call to SpilledLocal().
223  EIGEN_MUTEX_LOCK lock(mu_);
224  for (auto& kv : per_thread_map_) {
225  f(kv.first, kv.second);
226  }
227  }
228 
229  // WARN: It's not thread safe to call it concurrently with `local()`.
231  // Reading directly from `data_` is unsafe, because only CAS to the record
232  // in `ptr_` makes all changes visible to other threads.
233  for (auto& ptr : ptr_) {
234  ThreadIdAndValue* record = ptr.load();
235  if (record == nullptr) continue;
236  release_(record->value);
237  }
238 
239  // We did not spill into the map based storage.
240  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
241 
242  // Adds a happens before edge from the last call to SpilledLocal().
243  EIGEN_MUTEX_LOCK lock(mu_);
244  for (auto& kv : per_thread_map_) {
245  release_(kv.second);
246  }
247  }
248 
249  private:
251  std::thread::id thread_id;
253  };
254 
255  // Use unordered map guarded by a mutex when lock free storage is full.
256  T& SpilledLocal(std::thread::id this_thread) {
257  EIGEN_MUTEX_LOCK lock(mu_);
258 
259  auto it = per_thread_map_.find(this_thread);
260  if (it == per_thread_map_.end()) {
261  auto result = per_thread_map_.emplace(this_thread, T());
262  eigen_assert(result.second);
263  initialize_((*result.first).second);
264  return (*result.first).second;
265  } else {
266  return it->second;
267  }
268  }
269 
270  Initialize initialize_;
271  Release release_;
272  const int capacity_;
273 
274  // Storage that backs lock-free lookup table `ptr_`. Records stored in this
275  // storage contiguously starting from index 0.
277 
278  // Atomic pointers to the data stored in `data_`. Used as a lookup table for
279  // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing).
281 
282  // Number of records stored in the `data_`.
283  std::atomic<int> filled_records_;
284 
285  // We fallback on per thread map if lock-free storage is full. In practice
286  // this should never happen, if `capacity_` is a reasonable estimate of the
287  // number of threads running in a system.
288  EIGEN_MUTEX mu_; // Protects per_thread_map_.
289  std::unordered_map<std::thread::id, T> per_thread_map_;
290 };
291 
292 } // namespace Eigen
293 
294 #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
int i
Definition: BiCGSTAB_step_by_step.cpp:9
Eigen::Triplet< double > T
Definition: EigenUnitTest.cpp:11
#define eigen_assert(x)
Definition: Macros.h:910
void load(Archive &ar, ParticleHandler &handl)
Definition: Particles.h:21
The MaxSizeVector class.
Definition: MaxSizeVector.h:31
Definition: ThreadLocal.h:112
~ThreadLocal()
Definition: ThreadLocal.h:230
Release release_
Definition: ThreadLocal.h:271
T & SpilledLocal(std::thread::id this_thread)
Definition: ThreadLocal.h:256
T & local()
Definition: ThreadLocal.h:137
ThreadLocal(int capacity)
Definition: ThreadLocal.h:117
Initialize initialize_
Definition: ThreadLocal.h:270
ThreadLocal(int capacity, Initialize initialize)
Definition: ThreadLocal.h:120
void ForEach(std::function< void(std::thread::id, T &)> f)
Definition: ThreadLocal.h:210
MaxSizeVector< ThreadIdAndValue > data_
Definition: ThreadLocal.h:276
MaxSizeVector< std::atomic< ThreadIdAndValue * > > ptr_
Definition: ThreadLocal.h:280
EIGEN_MUTEX mu_
Definition: ThreadLocal.h:288
std::atomic< int > filled_records_
Definition: ThreadLocal.h:283
const int capacity_
Definition: ThreadLocal.h:272
ThreadLocal(int capacity, Initialize initialize, Release release)
Definition: ThreadLocal.h:123
std::unordered_map< std::thread::id, T > per_thread_map_
Definition: ThreadLocal.h:289
static int f(const TensorMap< Tensor< int, 3 > > &tensor)
Definition: cxx11_tensor_map.cpp:237
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:70
squared absolute value
Definition: GlobalFunctions.h:87
Definition: Eigen_Colamd.h:49
Definition: ThreadLocal.h:250
T value
Definition: ThreadLocal.h:252
std::thread::id thread_id
Definition: ThreadLocal.h:251
Definition: ThreadLocal.h:67
void operator()(T &) const
Definition: ThreadLocal.h:68
Definition: ThreadLocal.h:72
void operator()(T &) const
Definition: ThreadLocal.h:73