Skip to content

Commit

Permalink
API to use subset of pthreadpool threads to do the work.
Browse files Browse the repository at this point in the history
Summary:
This diff splits the command command queueu and instead uses commands
specific to each thread.

This enables:
- Waking up only subset of threads needed.
- Waiting for only subset of threads

In this commit the number of threads to use is a thread local variable.
Subsetquent commit makes that a property of threadpool object

Test Plan:
pthreadpool-test

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
kimishpatel committed Nov 18, 2021
1 parent 1787867 commit fa30cac
Show file tree
Hide file tree
Showing 9 changed files with 738 additions and 234 deletions.
19 changes: 19 additions & 0 deletions include/pthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ pthreadpool_t pthreadpool_create(size_t threads_count);
*/
size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);

/*
* API to enable doing work with fewer threads than available in
* threadpool.
* Purpose of this is to ameliorate some perf degradation observed
* due to OS mapping a given set of threads to fewer cores.
*
* @param num_threads num threads to use for the subsequent tasks
* submitted.
*/
void pthreadpool_set_num_threads_to_use(size_t num_threads);

/*
* Query current setting of the number of threads to use
*
* @returns The number of threads to be used for the subsequent tasks
* submitted.
*/
size_t pthreadpool_get_num_threads_to_use(void);

/**
* Process items on a 1D grid.
*
Expand Down
42 changes: 21 additions & 21 deletions src/fastpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath(
const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -77,7 +77,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath(
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -113,7 +113,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath(
const pthreadpool_task_1d_tile_1d_t task = (pthreadpool_task_1d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -155,7 +155,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath(
const pthreadpool_task_2d_t task = (pthreadpool_task_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -201,7 +201,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath(
const pthreadpool_task_2d_tile_1d_t task = (pthreadpool_task_2d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -251,7 +251,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath(
const pthreadpool_task_2d_tile_2d_t task = (pthreadpool_task_2d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -313,7 +313,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -366,7 +366,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath(
const pthreadpool_task_3d_t task = (pthreadpool_task_3d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -419,7 +419,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath(
const pthreadpool_task_3d_tile_1d_t task = (pthreadpool_task_3d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -476,7 +476,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath(
const pthreadpool_task_3d_tile_2d_t task = (pthreadpool_task_3d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -546,7 +546,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -607,7 +607,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath(
const pthreadpool_task_4d_t task = (pthreadpool_task_4d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -668,7 +668,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath(
const pthreadpool_task_4d_tile_1d_t task = (pthreadpool_task_4d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -733,7 +733,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath(
const pthreadpool_task_4d_tile_2d_t task = (pthreadpool_task_4d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -810,7 +810,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -878,7 +878,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath(
const pthreadpool_task_5d_t task = (pthreadpool_task_5d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -946,7 +946,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath(
const pthreadpool_task_5d_tile_1d_t task = (pthreadpool_task_5d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1019,7 +1019,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath(
const pthreadpool_task_5d_tile_2d_t task = (pthreadpool_task_5d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1095,7 +1095,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath(
const pthreadpool_task_6d_t task = (pthreadpool_task_6d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1171,7 +1171,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath(
const pthreadpool_task_6d_tile_1d_t task = (pthreadpool_task_6d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1252,7 +1252,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath(
const pthreadpool_task_6d_tile_2d_t task = (pthreadpool_task_6d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = threadpool->threads_count.value;
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down
14 changes: 13 additions & 1 deletion src/gcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "threadpool-object.h"
#include "threadpool-utils.h"

thread_local size_t max_num_threads = UINT_MAX;

static void thread_main(void* arg, size_t thread_index) {
struct pthreadpool* threadpool = (struct pthreadpool*) arg;
struct thread_info* thread = &threadpool->threads[thread_index];
Expand Down Expand Up @@ -62,6 +64,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
}
threadpool->threads_count = fxdiv_init_size_t(threads_count);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
for (size_t tid = 0; tid < threads_count; tid++) {
threadpool->threads[tid].thread_number = tid;
}
Expand All @@ -73,6 +76,14 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return threadpool;
}

void pthreadpool_set_num_threads_to_use(size_t num_threads) {
max_num_threads = num_threads;
}

size_t pthreadpool_get_num_threads_to_use() {
return max_num_threads;
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
struct pthreadpool* threadpool,
thread_function_t thread_function,
Expand All @@ -98,7 +109,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);

/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const struct fxdiv_result_size_tsize_t threads_count =
fxdiv_init_size_t(min(threadpool->threads_count.value, pthreadpool_get_num_threads_to_use()));

if (params_size != 0) {
memcpy(&threadpool->params, params, params_size);
Expand Down
Loading

0 comments on commit fa30cac

Please sign in to comment.