10#if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11#define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
17template <
typename Function,
typename... Args>
struct FunctionWrapperWithNotification
19 static void run(Notification*
n, Function f, Args... args) {
27template <
typename Function,
typename... Args>
struct FunctionWrapperWithBarrier
29 static void run(Barrier*
b, Function f, Args... args) {
37template <
typename SyncType>
47 virtual ~Allocator() {}
48 virtual void* allocate(
size_t num_bytes)
const = 0;
49 virtual void deallocate(
void* buffer)
const = 0;
53struct ThreadPoolDevice {
55 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores, Allocator* allocator =
nullptr)
56 : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
59 return allocator_ ? allocator_->allocate(num_bytes)
60 : internal::aligned_malloc(num_bytes);
65 allocator_->deallocate(buffer);
67 internal::aligned_free(buffer);
72 return allocate(num_bytes);
79 template<
typename Type>
86 ::memcpy(dst, src,
n);
92 const size_t kMinBlockSize = 32768;
93 const size_t num_threads = CostModel::numThreads(
n, TensorOpCost(1.0, 1.0, 0), 4);
94 if (
n <= kMinBlockSize || num_threads < 2) {
95 ::memcpy(dst, src,
n);
97 const char* src_ptr =
static_cast<const char*
>(src);
98 char* dst_ptr =
static_cast<char*
>(dst);
99 const size_t blocksize = (
n + (num_threads - 1)) / num_threads;
100 Barrier barrier(
static_cast<int>(num_threads - 1));
102 for (
size_t i = 1;
i < num_threads; ++
i) {
103 enqueue_with_barrier(&barrier, [
n,
i, src_ptr, dst_ptr, blocksize] {
104 ::memcpy(dst_ptr +
i * blocksize, src_ptr +
i * blocksize,
105 numext::mini(blocksize,
n - (
i * blocksize)));
109 ::memcpy(dst_ptr, src_ptr, blocksize);
122 ::memset(buffer,
c,
n);
132 return pool_->NumThreads();
149 template <
class Function,
class... Args>
151 Args&&... args)
const {
152 Notification*
n =
new Notification();
154 std::bind(&FunctionWrapperWithNotification<Function, Args...>::run,
n,
155 std::move(f), args...));
159 template <
class Function,
class... Args>
161 Args&&... args)
const {
163 std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run,
b,
164 std::move(f), args...));
167 template <
class Function,
class... Args>
169 Args&&... args)
const {
170 if (
sizeof...(args) > 0) {
171 pool_->Schedule(std::bind(std::move(f), args...));
173 pool_->Schedule(std::move(f));
180 return pool_->CurrentThreadId();
190 void parallelFor(Index
n,
const TensorOpCost& cost,
191 std::function<
Index(Index)> block_align,
192 std::function<
void(Index, Index)> f)
const {
196 }
else if (
n == 1 || numThreads() == 1 ||
197 CostModel::numThreads(
n, cost,
static_cast<int>(numThreads())) == 1) {
203 ParallelForBlock
block = CalculateParallelForBlock(
n, cost, block_align);
208 Barrier barrier(
static_cast<unsigned int>(
block.count));
209 std::function<void(Index, Index)> handleRange;
210 handleRange = [=, &handleRange, &barrier, &f](
Index firstIdx,
212 while (lastIdx - firstIdx >
block.size) {
215 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
219 f(firstIdx, lastIdx);
223 if (
block.count <= numThreads()) {
230 pool_->Schedule([=, &handleRange]() { handleRange(0,
n); });
237 void parallelFor(Index
n,
const TensorOpCost& cost,
238 std::function<
void(Index, Index)> f)
const {
239 parallelFor(
n, cost,
nullptr, std::move(f));
249 void parallelForAsync(Index
n,
const TensorOpCost& cost,
250 std::function<
Index(Index)> block_align,
251 std::function<
void(Index, Index)> f,
252 std::function<
void()> done)
const {
254 if (
n <= 1 || numThreads() == 1 ||
255 CostModel::numThreads(
n, cost,
static_cast<int>(numThreads())) == 1) {
262 ParallelForBlock
block = CalculateParallelForBlock(
n, cost, block_align);
264 ParallelForAsyncContext*
const ctx =
265 new ParallelForAsyncContext(
block.count, std::move(f), std::move(done));
270 ctx->handle_range = [
this, ctx,
block](
Index firstIdx,
Index lastIdx) {
271 while (lastIdx - firstIdx >
block.size) {
275 [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
280 ctx->f(firstIdx, lastIdx);
283 if (ctx->count.fetch_sub(1) == 1)
delete ctx;
286 if (
block.count <= numThreads()) {
289 ctx->handle_range(0,
n);
293 pool_->Schedule([ctx,
n]() { ctx->handle_range(0,
n); });
298 void parallelForAsync(Index
n,
const TensorOpCost& cost,
299 std::function<
void(Index, Index)> f,
300 std::function<
void()> done)
const {
301 parallelForAsync(
n, cost,
nullptr, std::move(f), std::move(done));
305 ThreadPoolInterface* getPool()
const {
return pool_; }
308 Allocator* allocator()
const {
return allocator_; }
311 typedef TensorCostModel<ThreadPoolDevice> CostModel;
315 struct ParallelForAsyncContext {
316 ParallelForAsyncContext(Index block_count,
317 std::function<
void(Index, Index)> block_f,
318 std::function<
void()> done_callback)
319 :
count(block_count),
320 f(
std::move(block_f)),
321 done(
std::move(done_callback)) {}
322 ~ParallelForAsyncContext() { done(); }
324 std::atomic<Index>
count;
325 std::function<void(Index, Index)> f;
326 std::function<void()> done;
328 std::function<void(Index, Index)> handle_range;
331 struct ParallelForBlock {
341 ParallelForBlock CalculateParallelForBlock(
342 const Index
n,
const TensorOpCost& cost,
343 std::function<
Index(Index)> block_align)
const {
344 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
345 const Index max_oversharding_factor = 4;
346 Index block_size = numext::mini(
347 n, numext::maxi<Index>(
348 divup<Index>(
n, max_oversharding_factor * numThreads()),
350 const Index max_block_size = numext::mini(
n, 2 * block_size);
353 Index new_block_size = block_align(block_size);
355 block_size = numext::mini(
n, new_block_size);
362 double max_efficiency =
363 static_cast<double>(block_count) /
364 (divup<int>(block_count, numThreads()) * numThreads());
368 for (Index prev_block_count = block_count;
369 max_efficiency < 1.0 && prev_block_count > 1;) {
372 Index coarser_block_size =
divup(
n, prev_block_count - 1);
374 Index new_block_size = block_align(coarser_block_size);
376 coarser_block_size = numext::mini(
n, new_block_size);
378 if (coarser_block_size > max_block_size) {
382 const Index coarser_block_count =
divup(
n, coarser_block_size);
384 prev_block_count = coarser_block_count;
385 const double coarser_efficiency =
386 static_cast<double>(coarser_block_count) /
387 (divup<int>(coarser_block_count, numThreads()) * numThreads());
388 if (coarser_efficiency + 0.01 >= max_efficiency) {
390 block_size = coarser_block_size;
391 block_count = coarser_block_count;
392 if (max_efficiency < coarser_efficiency) {
393 max_efficiency = coarser_efficiency;
398 return {block_size, block_count};
401 ThreadPoolInterface* pool_;
403 Allocator* allocator_;
int n
Definition BiCGSTAB_simple.cpp:1
int i
Definition BiCGSTAB_step_by_step.cpp:9
#define EIGEN_PREDICT_FALSE(x)
Definition Macros.h:1321
#define EIGEN_DEVICE_FUNC
Definition Macros.h:976
#define eigen_assert(x)
Definition Macros.h:1037
#define EIGEN_STRONG_INLINE
Definition Macros.h:917
int data[]
Definition Map_placement_new.cpp:1
m m block(1, 0, 2, 2)<< 4
Scalar Scalar * c
Definition benchVecAdd.cpp:17
Scalar * b
Definition benchVecAdd.cpp:17
Scalar Scalar int size
Definition benchVecAdd.cpp:17
Type
Definition Constants.h:471
Namespace containing all symbols from the Eigen library.
Definition bench_norm.cpp:85
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T divup(const X x, const Y y)
Definition TensorMeta.h:30
std::ptrdiff_t l1CacheSize()
Definition GeneralBlockPanelKernel.h:2607
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition Meta.h:74
std::ptrdiff_t l3CacheSize()
Definition GeneralBlockPanelKernel.h:2626
uint8_t count
Definition ref_serial.h:256
Container::iterator get(Container &c, Position position)
Definition stdlist_overload.cpp:29