Skip to content

Commit

Permalink
Use pthreadpool object to store max number of threads to use
Browse files Browse the repository at this point in the history
Summary:
This commits changes API to set max num threads. It applies the limit to
the pthreadpool object.

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
kimishpatel committed Nov 22, 2021
1 parent fa30cac commit 2a8a028
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 238 deletions.
19 changes: 12 additions & 7 deletions include/pthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,31 @@ pthreadpool_t pthreadpool_create(size_t threads_count);
*
* @returns The number of threads in the thread pool.
*/
size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);
size_t pthreadpool_get_max_threads_count(pthreadpool_t threadpool);

/*
* API to enable doing work with fewer threads than available in
* threadpool.
* This API takes in a pointer to threadpool object and sets number
* of threads to use with that threadpool.
* Purpose of this is to ameliorate some perf degradation observed
* due to OS mapping a given set of threads to fewer cores.
*
* @param threadpool threadpool object.
* @param num_threads num threads to use for the subsequent tasks
* submitted.
* submitted using the threadpool object.
*/
void pthreadpool_set_num_threads_to_use(size_t num_threads);
void pthreadpool_set_threads_count(pthreadpool_t threadpool, size_t num_threads);

/*
* Query current setting of the number of threads to use
* API to get number of threads to be used via threadpool. This number
* can be different from the size of the threadpool. It may have been set
* by pthreadpool_set_threads_count.
*
* @returns The number of threads to be used for the subsequent tasks
* submitted.
* @param threadpool threadpool object.
* @returns number of threads used by threadpool.
*/
size_t pthreadpool_get_num_threads_to_use(void);
size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);

/**
* Process items on a 1D grid.
Expand Down
42 changes: 21 additions & 21 deletions src/fastpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath(
const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -77,7 +77,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath(
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -113,7 +113,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath(
const pthreadpool_task_1d_tile_1d_t task = (pthreadpool_task_1d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -155,7 +155,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath(
const pthreadpool_task_2d_t task = (pthreadpool_task_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -201,7 +201,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath(
const pthreadpool_task_2d_tile_1d_t task = (pthreadpool_task_2d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -251,7 +251,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath(
const pthreadpool_task_2d_tile_2d_t task = (pthreadpool_task_2d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -313,7 +313,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -366,7 +366,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath(
const pthreadpool_task_3d_t task = (pthreadpool_task_3d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -419,7 +419,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath(
const pthreadpool_task_3d_tile_1d_t task = (pthreadpool_task_3d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -476,7 +476,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath(
const pthreadpool_task_3d_tile_2d_t task = (pthreadpool_task_3d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -546,7 +546,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -607,7 +607,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath(
const pthreadpool_task_4d_t task = (pthreadpool_task_4d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -668,7 +668,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath(
const pthreadpool_task_4d_tile_1d_t task = (pthreadpool_task_4d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -733,7 +733,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath(
const pthreadpool_task_4d_tile_2d_t task = (pthreadpool_task_4d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -810,7 +810,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -878,7 +878,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath(
const pthreadpool_task_5d_t task = (pthreadpool_task_5d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -946,7 +946,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath(
const pthreadpool_task_5d_tile_1d_t task = (pthreadpool_task_5d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1019,7 +1019,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath(
const pthreadpool_task_5d_tile_2d_t task = (pthreadpool_task_5d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1095,7 +1095,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath(
const pthreadpool_task_6d_t task = (pthreadpool_task_6d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1171,7 +1171,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath(
const pthreadpool_task_6d_tile_1d_t task = (pthreadpool_task_6d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1252,7 +1252,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath(
const pthreadpool_task_6d_tile_2d_t task = (pthreadpool_task_6d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down
34 changes: 16 additions & 18 deletions src/gcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include "threadpool-object.h"
#include "threadpool-utils.h"

thread_local size_t max_num_threads = UINT_MAX;

static void thread_main(void* arg, size_t thread_index) {
struct pthreadpool* threadpool = (struct pthreadpool*) arg;
struct thread_info* thread = &threadpool->threads[thread_index];
Expand All @@ -44,8 +42,8 @@ static void thread_main(void* arg, size_t thread_index) {
}
}

struct pthreadpool* pthreadpool_create(size_t threads_count) {
if (threads_count == 0) {
struct pthreadpool* pthreadpool_create(size_t max_threads_count) {
if (max_threads_count == 0) {
int threads = 1;
size_t sizeof_threads = sizeof(threads);
if (sysctlbyname("hw.logicalcpu_max", &threads, &sizeof_threads, NULL, 0) != 0) {
Expand All @@ -56,32 +54,32 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
}

threads_count = (size_t) threads;
max_threads_count = (size_t) threads;
}

struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
struct pthreadpool* threadpool = pthreadpool_allocate(max_threads_count);
if (threadpool == NULL) {
return NULL;
}
threadpool->threads_count = fxdiv_init_size_t(threads_count);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
for (size_t tid = 0; tid < threads_count; tid++) {
threadpool->max_threads_count = fxdiv_init_size_t(max_threads_count);
pthreadpool_store_relaxed_size_t(&threadpool->threads_count, max_threads_count);
for (size_t tid = 0; tid < max_threads_count; tid++) {
threadpool->threads[tid].thread_number = tid;
}

/* Thread pool with a single thread computes everything on the caller thread. */
if (threads_count > 1) {
if (max_threads_count > 1) {
threadpool->execution_semaphore = dispatch_semaphore_create(1);
}
return threadpool;
}

void pthreadpool_set_num_threads_to_use(size_t num_threads) {
max_num_threads = num_threads;
}

size_t pthreadpool_get_num_threads_to_use() {
return max_num_threads;
void pthreadpool_set_threads_count(struct pthreadpool* threadpool, size_t num_threads) {
dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER);
const struct fxdiv_divisor_size_t max_threads_count = threadpool->max_threads_count;
const size_t num_threads_to_use = min(max_threads_count.value, num_threads);
pthreadpool_store_relaxed_size_t(&threadpool->threads_count, num_threads_to_use);
dispatch_semaphore_signal(threadpool->execution_semaphore);
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
Expand Down Expand Up @@ -109,8 +107,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);

/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const struct fxdiv_result_size_tsize_t threads_count =
fxdiv_init_size_t(min(threadpool->threads_count.value, pthreadpool_get_num_threads_to_use()));
const struct fxdiv_divisor_size_t threads_count =
fxdiv_init_size_t(pthreadpool_load_relaxed_size_t(&threadpool->threads_count));

if (params_size != 0) {
memcpy(&threadpool->params, params, params_size);
Expand Down
8 changes: 4 additions & 4 deletions src/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@


PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate(
size_t threads_count)
size_t max_threads_count)
{
assert(threads_count >= 1);
assert(max_threads_count >= 1);

const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info);
const size_t threadpool_size = sizeof(struct pthreadpool) + max_threads_count * sizeof(struct thread_info);
struct pthreadpool* threadpool = NULL;
#if defined(__ANDROID__)
/*
Expand Down Expand Up @@ -55,7 +55,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_deallocate(
{
assert(threadpool != NULL);

const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->threads_count.value * sizeof(struct thread_info);
const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->max_threads_count.value * sizeof(struct thread_info);
memset(threadpool, 0, threadpool_size);

#ifdef _WIN32
Expand Down

0 comments on commit 2a8a028

Please sign in to comment.