Skip to content

Commit

Permalink
This commit fixes a bug in the first implementation of using subset of
Browse files Browse the repository at this point in the history
threads.

Summary:
Two things addressed.
- Enter spin loop wait only if you qualfied to participate in the last
command submitted.
- If you did not qualify to run a command but observed a new command
submission, ack via checking in.

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
kimishpatel committed Oct 28, 2021
1 parent 6705e4b commit 668abfa
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
4 changes: 0 additions & 4 deletions src/portable-api.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
#include "threadpool-object.h"
#include "threadpool-utils.h"

// forward declare function to get
// current value of capped_num_threads
size_t pthreadpool_get_capped_num_threads();

size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) {
if (threadpool == NULL) {
return 1;
Expand Down
30 changes: 28 additions & 2 deletions src/pthreads.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
* 4. Main thread submits command to a shared atomic command queue
* 5. Child threads in their waiting for command fn check if they are eligible
* to run this command. If not they continue to wait.
* One corner case here is:
* Thread 0 (always qualified): command submitted(1) --- wait for worker threads --- submit command(2) -- wait for worker threads -- submit command(3) -- wait for worker threads
* Thread 1 (always qualified): wait for command(1) -- run cmd -- active_threads-1 -- wait for next cmd(2) -- active_threads-1 -- wait for next cmd(3) -- active_threads - 1
* Thread 2 (!qualified for cmd 1 and 2): wait for command(1) -- not qualfied ---------------------------------------------------- wait for next eligible command (3)
* Now above can happen if not ALL threads are synchronised before next command is submitted. Assume only 2 threads qualify for cmd 1 and 2 and all three for cmd 3.
* Master thread submits cmd 1. All 2 threads see it. Thread 2 sees the command but knows it is not qualified to run thus it will try to go to conditional wait.
* Now, since master thread does not wait for thread 2 that does not participate in the command, it jummps to command 2.
* However thread 2 has not yet reached its conditional wait. Thus its last_command = cmd 1. Master thread submits cmd 2 and finishes and is waiting for cmd 3.
* At this point thread 2 starts waiting with last_command = cmd 1 because it never saw cmd 2. Master thread submits cmd 3. At cmd 3 all three threads are eligible.
* However thread 2 will never see cmd 3 because the masked bit is same as cmd1 and it is not perceived as new command.
* To fix this we must add this invariant:
* - Master thread must synchronize with all threads before submitting next command regardless of the eligibility of threads to paricipate in the work.
* Testing:
* 1. Create threadpool with 4 threads.
* 2. Run parallelize_1d task x with 64 elements to be processed.
Expand Down Expand Up @@ -183,12 +195,15 @@ static uint32_t wait_for_new_command(
return command;
}

if (command != last_command) {
checkin_worker_thread(threadpool);
}
// We should come here only if this thread was not eligible to process
// the new command recieved. In that case we must continue to wait to recieve
// next eligible command.
last_command = command;

if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
if (qualified_to_run_new_command && (last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
/* Spin-wait loop */
for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
pthreadpool_yield();
Expand All @@ -199,6 +214,9 @@ static uint32_t wait_for_new_command(
if (qualified_to_run_new_command && (command != last_command)) {
return command;
}
if (command != last_command) {
checkin_worker_thread(threadpool);
}
last_command = command;
}
}
Expand All @@ -214,6 +232,9 @@ static uint32_t wait_for_new_command(
* thread.
* In that case we will wait for change, via futex_wait, in the value of command
*/
if (command != last_command) {
checkin_worker_thread(threadpool);
}
last_command = command;
futex_wait(&threadpool->command, last_command);
// Can this be relaxed ordered load given the next one is acquire?
Expand All @@ -238,6 +259,9 @@ static uint32_t wait_for_new_command(
num_threads_to_use = pthreadpool_load_acquire_size_t(&threadpool->num_threads_to_use);
qualified_to_run_new_command = thread_id < num_threads_to_use;
while (!qualified_to_run_new_command || (command == last_command)) {
if (command != last_command) {
checkin_worker_thread(threadpool);
}
last_command = command;

/* Wait for new command */
Expand Down Expand Up @@ -405,7 +429,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const struct fxdiv_divisor_size_t num_threads_to_use = fxdiv_init_size_t(min(threads_count.value, capped_num_threads));
pthreadpool_store_relaxed_size_t(&threadpool->active_threads, num_threads_to_use.value - 1 /* caller thread */);
// All threads will check in to ack that they received the command and finished it.
// Not eligible threads skip work but still ack to maintain the invariant mentioned earlier.
pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use.value);
#if PTHREADPOOL_USE_FUTEX
pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
Expand Down
18 changes: 16 additions & 2 deletions src/windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ static uint32_t wait_for_new_command(
return command;
}

if (command != last_command) {
const uint32_t event_index = command >> 31;
checkin_worker_thread(threadpool, event_index);
}
// We should come here only if this thread was not eligible to process
// the new command recieved. In that case we must continue to wait to recieve
// next eligible command.
last_command = command;

if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
if (qualified_to_run_new_command && (last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
/* Spin-wait loop */
for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
pthreadpool_yield();
Expand All @@ -83,6 +87,10 @@ static uint32_t wait_for_new_command(
if (qualified_to_run_new_command && (command != last_command)) {
return command;
}
if (command != last_command) {
const uint32_t event_index = command >> 31;
checkin_worker_thread(threadpool, event_index);
}
last_command = command;
}
}
Expand All @@ -97,6 +105,10 @@ static uint32_t wait_for_new_command(
* thread.
* In that case we will wait for change, via futex_wait, in the value of command
*/
if (command != last_command) {
const uint32_t event_index = command >> 31;
checkin_worker_thread(threadpool, event_index);
}
last_command = command;
const uint32_t event_index = (last_command >> 31);
const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
Expand Down Expand Up @@ -252,7 +264,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(

const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const struct fxdiv_divisor_size_t num_threads_to_use = fxdiv_init_size_t(min(threads_count.value, capped_num_threads));
pthreadpool_store_relaxed_size_t(&threadpool->active_threads, num_threads_to_use.value - 1 /* caller thread */);
// All threads will check in to ack that they received the command and finished it.
// Not eligible threads skip work but still ack to maintain the invariant mentioned in pthreads.c.
pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use.value);

if (params_size != 0) {
Expand Down

0 comments on commit 668abfa

Please sign in to comment.