Eigen::ThreadLocal< T, Initialize, Release > Class Template Reference

#include <ThreadLocal.h>

Classes

struct  ThreadIdAndValue
 

Public Member Functions

 ThreadLocal (int capacity)
 
 ThreadLocal (int capacity, Initialize initialize)
 
 ThreadLocal (int capacity, Initialize initialize, Release release)
 
Tlocal ()
 
void ForEach (std::function< void(std::thread::id, T &)> f)
 
 ~ThreadLocal ()
 

Private Member Functions

TSpilledLocal (std::thread::id this_thread)
 

Private Attributes

Initialize initialize_
 
Release release_
 
const int capacity_
 
MaxSizeVector< ThreadIdAndValuedata_
 
MaxSizeVector< std::atomic< ThreadIdAndValue * > > ptr_
 
std::atomic< intfilled_records_
 
EIGEN_MUTEX mu_
 
std::unordered_map< std::thread::id, Tper_thread_map_
 

Constructor & Destructor Documentation

◆ ThreadLocal() [1/3]

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::ThreadLocal ( int  capacity)
inlineexplicit
118  : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(), internal::ThreadLocalNoOpRelease<T>()) {}
ThreadLocal(int capacity)
Definition: ThreadLocal.h:117

◆ ThreadLocal() [2/3]

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::ThreadLocal ( int  capacity,
Initialize  initialize 
)
inline
121  : ThreadLocal(capacity, std::move(initialize), internal::ThreadLocalNoOpRelease<T>()) {}

◆ ThreadLocal() [3/3]

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::ThreadLocal ( int  capacity,
Initialize  initialize,
Release  release 
)
inline
124  : initialize_(std::move(initialize)),
125  release_(std::move(release)),
126  capacity_(capacity),
127  data_(capacity_),
128  ptr_(capacity_),
129  filled_records_(0) {
130  eigen_assert(capacity_ >= 0);
131  data_.resize(capacity_);
132  for (int i = 0; i < capacity_; ++i) {
133  ptr_.emplace_back(nullptr);
134  }
135  }
int i
Definition: BiCGSTAB_step_by_step.cpp:9
#define eigen_assert(x)
Definition: Macros.h:910
Release release_
Definition: ThreadLocal.h:271
Initialize initialize_
Definition: ThreadLocal.h:270
MaxSizeVector< ThreadIdAndValue > data_
Definition: ThreadLocal.h:276
MaxSizeVector< std::atomic< ThreadIdAndValue * > > ptr_
Definition: ThreadLocal.h:280
std::atomic< int > filled_records_
Definition: ThreadLocal.h:283
const int capacity_
Definition: ThreadLocal.h:272

References Eigen::ThreadLocal< T, Initialize, Release >::capacity_, Eigen::ThreadLocal< T, Initialize, Release >::data_, eigen_assert, i, and Eigen::ThreadLocal< T, Initialize, Release >::ptr_.

◆ ~ThreadLocal()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::~ThreadLocal ( )
inline
230  {
231  // Reading directly from `data_` is unsafe, because only CAS to the record
232  // in `ptr_` makes all changes visible to other threads.
233  for (auto& ptr : ptr_) {
234  ThreadIdAndValue* record = ptr.load();
235  if (record == nullptr) continue;
236  release_(record->value);
237  }
238 
239  // We did not spill into the map based storage.
240  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
241 
242  // Adds a happens before edge from the last call to SpilledLocal().
243  EIGEN_MUTEX_LOCK lock(mu_);
244  for (auto& kv : per_thread_map_) {
245  release_(kv.second);
246  }
247  }
EIGEN_MUTEX mu_
Definition: ThreadLocal.h:288
std::unordered_map< std::thread::id, T > per_thread_map_
Definition: ThreadLocal.h:289

References Eigen::ThreadLocal< T, Initialize, Release >::capacity_, Eigen::ThreadLocal< T, Initialize, Release >::filled_records_, Eigen::ThreadLocal< T, Initialize, Release >::mu_, Eigen::ThreadLocal< T, Initialize, Release >::per_thread_map_, Eigen::ThreadLocal< T, Initialize, Release >::ptr_, Eigen::ThreadLocal< T, Initialize, Release >::release_, and Eigen::ThreadLocal< T, Initialize, Release >::ThreadIdAndValue::value.

Member Function Documentation

◆ ForEach()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
void Eigen::ThreadLocal< T, Initialize, Release >::ForEach ( std::function< void(std::thread::id, T &)>  f)
inline
210  {
211  // Reading directly from `data_` is unsafe, because only CAS to the
212  // record in `ptr_` makes all changes visible to other threads.
213  for (auto& ptr : ptr_) {
214  ThreadIdAndValue* record = ptr.load();
215  if (record == nullptr) continue;
216  f(record->thread_id, record->value);
217  }
218 
219  // We did not spill into the map based storage.
220  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
221 
222  // Adds a happens before edge from the last call to SpilledLocal().
223  EIGEN_MUTEX_LOCK lock(mu_);
224  for (auto& kv : per_thread_map_) {
225  f(kv.first, kv.second);
226  }
227  }
static int f(const TensorMap< Tensor< int, 3 > > &tensor)
Definition: cxx11_tensor_map.cpp:237

References Eigen::ThreadLocal< T, Initialize, Release >::capacity_, f(), Eigen::ThreadLocal< T, Initialize, Release >::filled_records_, Eigen::ThreadLocal< T, Initialize, Release >::mu_, Eigen::ThreadLocal< T, Initialize, Release >::per_thread_map_, Eigen::ThreadLocal< T, Initialize, Release >::ptr_, Eigen::ThreadLocal< T, Initialize, Release >::ThreadIdAndValue::thread_id, and Eigen::ThreadLocal< T, Initialize, Release >::ThreadIdAndValue::value.

Referenced by test_large_number_of_tasks_no_spill(), test_large_number_of_tasks_with_spill(), test_simple_thread_local(), and test_zero_sized_thread_local().

◆ local()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
T& Eigen::ThreadLocal< T, Initialize, Release >::local ( )
inline
137  {
138  std::thread::id this_thread = std::this_thread::get_id();
139  if (capacity_ == 0) return SpilledLocal(this_thread);
140 
141  std::size_t h = std::hash<std::thread::id>()(this_thread);
142  const int start_idx = h % capacity_;
143 
144  // NOTE: From the definition of `std::this_thread::get_id()` it is
145  // guaranteed that we never can have concurrent insertions with the same key
146  // to our hash-map like data structure. If we didn't find an element during
147  // the initial traversal, it's guaranteed that no one else could have
148  // inserted it while we are in this function. This allows to massively
149  // simplify out lock-free insert-only hash map.
150 
151  // Check if we already have an element for `this_thread`.
152  int idx = start_idx;
153  while (ptr_[idx].load() != nullptr) {
154  ThreadIdAndValue& record = *(ptr_[idx].load());
155  if (record.thread_id == this_thread) return record.value;
156 
157  idx += 1;
158  if (idx >= capacity_) idx -= capacity_;
159  if (idx == start_idx) break;
160  }
161 
162  // If we are here, it means that we found an insertion point in lookup
163  // table at `idx`, or we did a full traversal and table is full.
164 
165  // If lock-free storage is full, fallback on mutex.
166  if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
167 
168  // We double check that we still have space to insert an element into a lock
169  // free storage. If old value in `filled_records_` is larger than the
170  // records capacity, it means that some other thread added an element while
171  // we were traversing lookup table.
172  int insertion_index = filled_records_.fetch_add(1, std::memory_order_relaxed);
173  if (insertion_index >= capacity_) return SpilledLocal(this_thread);
174 
175  // At this point it's guaranteed that we can access to
176  // data_[insertion_index_] without a data race.
177  data_[insertion_index].thread_id = this_thread;
178  initialize_(data_[insertion_index].value);
179 
180  // That's the pointer we'll put into the lookup table.
181  ThreadIdAndValue* inserted = &data_[insertion_index];
182 
183  // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
184  ThreadIdAndValue* empty = nullptr;
185 
186  // Now we have to find an insertion point into the lookup table. We start
187  // from the `idx` that was identified as an insertion point above, it's
188  // guaranteed that we will have an empty record somewhere in a lookup table
189  // (because we created a record in the `data_`).
190  const int insertion_idx = idx;
191 
192  do {
193  // Always start search from the original insertion candidate.
194  idx = insertion_idx;
195  while (ptr_[idx].load() != nullptr) {
196  idx += 1;
197  if (idx >= capacity_) idx -= capacity_;
198  // If we did a full loop, it means that we don't have any free entries
199  // in the lookup table, and this means that something is terribly wrong.
200  eigen_assert(idx != insertion_idx);
201  }
202  // Atomic CAS of the pointer guarantees that any other thread, that will
203  // follow this pointer will see all the mutations in the `data_`.
204  } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
205 
206  return inserted->value;
207  }
void load(Archive &ar, ParticleHandler &handl)
Definition: Particles.h:21
T & SpilledLocal(std::thread::id this_thread)
Definition: ThreadLocal.h:256
squared absolute value
Definition: GlobalFunctions.h:87

References Eigen::ThreadLocal< T, Initialize, Release >::capacity_, Eigen::ThreadLocal< T, Initialize, Release >::data_, eigen_assert, Eigen::ThreadLocal< T, Initialize, Release >::filled_records_, Eigen::ThreadLocal< T, Initialize, Release >::initialize_, load(), Eigen::ThreadLocal< T, Initialize, Release >::ptr_, Eigen::ThreadLocal< T, Initialize, Release >::SpilledLocal(), Eigen::ThreadLocal< T, Initialize, Release >::ThreadIdAndValue::thread_id, Eigen::value, and Eigen::ThreadLocal< T, Initialize, Release >::ThreadIdAndValue::value.

Referenced by test_large_number_of_tasks_no_spill(), test_large_number_of_tasks_with_spill(), test_simple_thread_local(), and test_zero_sized_thread_local().

◆ SpilledLocal()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
T& Eigen::ThreadLocal< T, Initialize, Release >::SpilledLocal ( std::thread::id  this_thread)
inlineprivate
256  {
257  EIGEN_MUTEX_LOCK lock(mu_);
258 
259  auto it = per_thread_map_.find(this_thread);
260  if (it == per_thread_map_.end()) {
261  auto result = per_thread_map_.emplace(this_thread, T());
262  eigen_assert(result.second);
263  initialize_((*result.first).second);
264  return (*result.first).second;
265  } else {
266  return it->second;
267  }
268  }
Eigen::Triplet< double > T
Definition: EigenUnitTest.cpp:11

References eigen_assert, Eigen::ThreadLocal< T, Initialize, Release >::initialize_, Eigen::ThreadLocal< T, Initialize, Release >::mu_, and Eigen::ThreadLocal< T, Initialize, Release >::per_thread_map_.

Referenced by Eigen::ThreadLocal< T, Initialize, Release >::local().

Member Data Documentation

◆ capacity_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
const int Eigen::ThreadLocal< T, Initialize, Release >::capacity_
private

◆ data_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
MaxSizeVector<ThreadIdAndValue> Eigen::ThreadLocal< T, Initialize, Release >::data_
private

◆ filled_records_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
std::atomic<int> Eigen::ThreadLocal< T, Initialize, Release >::filled_records_
private

◆ initialize_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Initialize Eigen::ThreadLocal< T, Initialize, Release >::initialize_
private

◆ mu_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
EIGEN_MUTEX Eigen::ThreadLocal< T, Initialize, Release >::mu_
private

◆ per_thread_map_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
std::unordered_map<std::thread::id, T> Eigen::ThreadLocal< T, Initialize, Release >::per_thread_map_
private

◆ ptr_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
MaxSizeVector<std::atomic<ThreadIdAndValue*> > Eigen::ThreadLocal< T, Initialize, Release >::ptr_
private

◆ release_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Release Eigen::ThreadLocal< T, Initialize, Release >::release_
private

The documentation for this class was generated from the following file: