Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when using nested tasks + semaphore #506

Open
olologin opened this issue Sep 12, 2023 · 25 comments
Open

Deadlock when using nested tasks + semaphore #506

olologin opened this issue Sep 12, 2023 · 25 comments
Labels
question Further information is requested

Comments

@olologin
Copy link

olologin commented Sep 12, 2023

Describe the bug
With some low probability the following unittest deadlocks instead of completing.
It seems tf somehow "forgets" to execute the most nested task which is supposed to do the actual initialization of lazy value, and the rest of threads are either waiting for a task, or just get stuck.

To Reproduce
Just paste the following snippet into any uniitest file, build and run it, normally the issue shows up on iteration# 200-400.

TEST_CASE("LazyInit_Deadlocks.16threads")
{
	using namespace std::chrono_literals;

	for (size_t i = 0; i < 10000; ++i)
	{
		//std::cout << i << std::endl;
		tf::Executor executor(16);
		tf::Semaphore semaphore(1);
		std::atomic_bool has_value = false;
		
		// The following lambda simulates lazy initializer of some complex and slow to initialize variable
		auto lazy_data = [&]()
		{
			// If we already have a value, return it
			if (!has_value.load(std::memory_order_acquire))
			{
				// Otherwise, calculate it in a task
				tf::Taskflow taskflow2;
				// Let's imagine the following task is just some slow and complex initialization of some meaningful variable
				auto task = taskflow2.emplace([&]() {
					if (!has_value.load(std::memory_order_acquire))
					{
						tf::Taskflow taskflow3;
						taskflow3.emplace([&] { std::this_thread::sleep_for(1ms); });
						executor.corun(taskflow3);
						has_value.store(true, std::memory_order_release);
					}
					});
				task.acquire(semaphore);
				task.release(semaphore);
				executor.corun(taskflow2);
			}
			return 99;
		};

		tf::Taskflow taskflow1;
		for (size_t k = 0; k < 16; ++k)
		{
			taskflow1.emplace(
				[&]
				{
					lazy_data();
				});
		}

		// If the following line fails - you are expiriencing deadlock
		REQUIRE(executor.run(taskflow1).wait_for(5s) == std::future_status::ready);
	}
}

Desktop (please complete the following information):
taskflow 3.6, Windows 10, VS2019, x64 build and Ryzen 3800x CPU.

Additional context
No other information available.

@bradphelan
Copy link
Contributor

bradphelan commented Sep 13, 2023

A slight variation on the above test which makes it a bit more tidy but still deadlocks

TEST_CASE("Semaphore.Deadlock")
{
     using namespace std::chrono_literals;


    for (size_t i = 0; i < 10000; ++i)
    {
        tf::CriticalSection critical(1);

        auto lazy_data = [&](tf::Subflow &rt)
        {
            auto task = rt.emplace([&](tf::Subflow&rt)
            {
                // (A) If this line is commented out then the test passes
                rt.emplace([&] {  });

                rt.join();
            });

            // (B) If this line is commented out then the test passes
            critical.add(task);

            rt.join();
        };

        tf::Taskflow flow;
        for (size_t k = 0; k < 16; ++k)
        {
            auto task = flow.emplace(
                [&](tf::Subflow&rt)
                {
                    lazy_data(rt);
                });
        }

        std::cout << i << std::endl;

        // If the following line fails - you are experiencing deadlock
        REQUIRE(executor.run(flow).wait_for(5s) == std::future_status::ready);


    }
}

If either (A) or (B) is commented out it deadlocks.

@bradphelan
Copy link
Contributor

bradphelan commented Sep 13, 2023

The pull request is just for the failing test. No solution is provided.

TEST_CASE("Semaphore.Deadlock")
{
using namespace std::chrono_literals;
for (size_t i = 0; i < 10000; ++i)
{
tf::CriticalSection critical(1);
tf::Executor executor(16);
auto lazy_data = [&](tf::Subflow &rt)
{
auto task = rt.emplace([&](tf::Subflow&rt)
{
// (A) If this line is commented out then the test passes
rt.emplace([&] { });
rt.join();
});
// (B) If this line is commented out then the test passes
critical.add(task);
rt.join();
};
tf::Taskflow flow;
for (size_t k = 0; k < 16; ++k)
{
auto task = flow.emplace(
[&](tf::Subflow&rt)
{
lazy_data(rt);
});
}
std::cout << i << std::endl;
// If the following line fails - you are experiencing deadlock
REQUIRE(executor.run(flow).wait_for(5s) == std::future_status::ready);
}
}

@tsung-wei-huang
Copy link
Member

The problem happens because tf::Semaphore or tf::CriticalSection is not thread-safe. In your case, multiple subflows will try to add their tasks into the critical section. Does this make sense?

In practice, we do not recommend using tf::Semaphore or tf::CriticalSection within a recursive subflow

@tsung-wei-huang tsung-wei-huang added the question Further information is requested label Sep 28, 2023
@bradphelan
Copy link
Contributor

How then to do critical sections within recursive subflows. One cannot use mutexes because of the potential to deadlock the thread pool.

@olologin
Copy link
Author

olologin commented Sep 29, 2023

I think in our particular case we can redesign our functionality to only start one initializing task instead of multiple tasks, and the rest of workers will be just in corun_until until initialization is complete. This will protect us from deadlocks because there will be no way to run this initializing task recursively, because there will be only one task.

But I am still puzzled about:

tf::Semaphore or tf::CriticalSection is not thread-safe

I dont understand in which sense they are not thread-safe? They are constructed in a single thread and after that we must be able to use them from any amount of workers we like, no? Or is it some limitation that does not let us enqueue tasks associated with mutexes concurrently?

@tsung-wei-huang
Copy link
Member

Sorry for confusion - For tf::Semaphore, calling tf::Task::acquire or tf::Task::release will add that task into a std::vector waitlist of that semaphore and this operation is not thread-safe (see here). Does that make sense? (it's similar for tf::CriticalSection).

@olologin
Copy link
Author

@tsung-wei-huang Thanks, now I understand.

@olologin
Copy link
Author

olologin commented Sep 30, 2023

@tsung-wei-huang Sorry, I looked closer into sources and I think I don't understand where this race condition is.
This vector that you referred to (here) seems to be task-specific, and since we are modifying each task (calling acquire and release) only from a single thread at a time - it should not be a problem. And Semaphore has vector _waiters, but it seems its modification is always synchronized via std::mutex.

Update

I think in our particular case we can redesign our functionality to only start one initializing task instead of multiple tasks

It seems I was wrong, we still need to use some kind of tf::CriticalSection to implement lazy initialization reliably.

@bradphelan
Copy link
Contributor

bradphelan commented Oct 2, 2023

Let me show the full source code and unit test for our Lazy class. @tsung-wei-huang can then see more clearly what we are trying to achieve. The key observations are that:

  1. The lazy data is referenced within a taskflow task
  2. The factory function used to generate the lazy data spawns tasks
  3. std::scoped_lock is unsuitable because the schedular could quickly exhaust all threads in the thread pool and deadlock
  4. tf::Semaphore is unsuitable because it seems to have a deadlock bug/feature with the way we are using it.
// Copyright 2024 Moduleworks gmbh
#pragma once
#include <variant>
#pragma warning(push)
#pragma warning(disable : 4324 4456 5046)
#include <atomic>
#include <taskflow/taskflow.hpp>
#pragma warning(pop)

namespace mw::tf
{
enum class LazyProtocol
{
	/// Evaluate and store the result lazily
	Lazy,
	/// Evaluate and store the result eagerly
	Eager,
	/// Never store the result. Recalc every time
	NoCache
};

/// Provides lazy initialization of a constant under taskflow.
/// It uses taskflow semaphores to control access to the critical
/// section rather than std::mutex and std::scoped_lock which
/// may deadlock or cause poor performance for the taskflow
/// scheduler.
template <class T, LazyProtocol TProtocol = LazyProtocol::Lazy>
class Lazy
{
	/// The internal implementation that all instances share
	struct LazyImpl
	{
		/// The function used to generate the result
		const std::function<T()> m_fn;
		/// Critical section control
		::tf::Semaphore m_semaphore;
		/// The cached result
		std::variant<std::monostate, T, std::exception_ptr> m_data = std::monostate{};
		/// Atomic flag to declare the result is ready
		std::atomic<bool> m_has_value;

		LazyImpl(std::function<T()> f) : m_fn(f), m_semaphore(1), m_data(std::monostate{})
		{
			if constexpr (TProtocol == LazyProtocol::Eager)
			{
				Update();
				CheckHeldException();
			}
		}

		LazyImpl(LazyImpl const&) = delete;
		LazyImpl(LazyImpl&&) = delete;

	private:
		void CheckHeldException()
		{
			if (std::holds_alternative<std::exception_ptr>(m_data))
				std::rethrow_exception(std::get<std::exception_ptr>(m_data));
		}
		void Update()
		{
			if (!m_has_value.load(std::memory_order_acquire))
			{
				try
				{
					m_data = m_fn();
				}
				catch (...)
				{
					m_data = std::current_exception();
				}
				m_has_value.store(true, std::memory_order_release);
			}
		}

	public:
		T* get()
		{
			if constexpr (TProtocol == LazyProtocol::NoCache)
			{
				return m_fn();
			}
			else
			{
				if constexpr (TProtocol == LazyProtocol::Lazy)
				{
					// If we already have a value, return it
					if (!m_has_value.load(std::memory_order_acquire))
					{
						// Otherwise, calculate it in a task so that don't block
						// the current worker thread.
						::tf::Taskflow taskflow;
						const TaskContextSaver tcSaver;

						auto task = taskflow.emplace(
							[this, &tcSaver]()
							{
								// Set FP and thread priority
								TaskContextApplier tcApplier(tcSaver);
								// Initialize value
								Update();
							});
						task.acquire(m_semaphore);
						task.release(m_semaphore);
						mw::tf::Schedule(taskflow);
					}
				}
				// At this point we hold either an exception or a value
				CheckHeldException();
				return &std::get<T>(m_data);
			}
		}
	};

public:
	/// Pass a nullary (factory) function to be evaluated later.
	/// @param f nullary (factory) function to generate the value. Will be called only once
	/// @param ex the taskflow executor to schedule the task which generates the value
	template <typename Function>
	requires std::is_invocable_r_v<T, Function> Lazy(Function f)
		: m_impl(std::make_shared<LazyImpl>(f))
	{
		/// Returning a raw pointer here is bad behaviour
		/// as it is not clear at all who owns the value
		/// and who is responsible for deleting it.
		static_assert(!std::is_pointer_v<T>, "Factory function should not return a raw pointer");
	}

	/// Get the value. May suspend the current task and schedule another while lazy value is already
	/// being calculated from another location
	T const& operator*() const { return *m_impl->get(); }

	/// Get the value. May suspend the current task and schedule another while lazy value is already
	/// being calculated from another location
	T const* operator->() const { return m_impl->get(); }

	/// Returns true if the result has been calculated
	operator bool() const { return m_impl->has_value(); }

private:
	std::shared_ptr<LazyImpl> m_impl;
};
}  // namespace mw::tf

and the original test case showing the deadlock.

TEST(Taskflow, Lazy)
{
	using namespace std::chrono_literals;
	tf::Executor executor(8);  // create an executor of 8 workers

	/// proof that initialization only occurs once
	std::atomic<int> count0 = 0;
	std::atomic<int> count1 = 0;

	mw::tf::Lazy<int> data(
		[&]()
		{
			count0++;
			return 99;
		});


	auto job = [&]()
	{
		EXPECT_EQ(*data, 99);
		tf::Taskflow taskflow;
		taskflow
			.for_each_index(
				0,
				100,
				1,
				[&]([[maybe_unused]] int i)
				{
					EXPECT_EQ(*data, 99);
					std::this_thread::sleep_for(5ms);
					count1++;
				},
				::tf::StaticPartitioner(1))
			.name("loop");
		executor.corun(taskflow);
	};


	tf::Taskflow taskflow;
	taskflow.emplace(
		[&]()
		{
			tf::Taskflow taskflow;
			std::vector<tf::Task> tasks{
				taskflow.emplace(job).name("job 1"),
				taskflow.emplace(job).name("job 2"),
				taskflow.emplace(job).name("job 3"),
				taskflow.emplace(job).name("job 4"),
				taskflow.emplace(job).name("job 5")};

			executor.corun(taskflow);

			EXPECT_EQ(count0.load(), 1);
			EXPECT_EQ(count1.load(), 500);
			EXPECT_EQ(*data, 99);
		});


	executor.run(taskflow).wait();
}

@tsung-wei-huang
Copy link
Member

@olologin , if your examples, you will have multiple subflows that run simultaneously, while each of them has a task to acquire the semophore/critical section. Basically, tf::Semaphore and tf::CriticalSection is lazy. You will describe which task needs to grab it and than the runtime will perform semaphore/critical behaviors when the taskflow is submitted to executor. Does this make sense?

I will be happy to do a conference call if that works better for you :) feel free to reach out at [email protected]

@bradphelan
Copy link
Contributor

bradphelan commented Oct 3, 2023

I'm not sure the point you are trying to make. We can assume that you are correct in stating that you have a race condition with tf::Semaphore and nested subflows. However I'm not particularly caring of what mechanism we use to achieve this as long as it works. The requirement is for lazily cached data where the factory function to generate the data uses nested subflows. We have found that this does not work using std::scoped_lock because you can lock the mutex twice on the same thread. Using recursive_mutex is not the answer as you still end up with two tasks inside the critical section. tf::Semaphore and launching a task to do the factory operation seems to be the right semantics but as you state there is a design problem with tf::Semaphore that prevents what we are trying to do. In summary

  • Can't use std::mutex as it causes deadlock
  • Can't use std::recursive_mutex as it doesn't provide exclusion garuntees we want
  • Can't use tf::Semaphore because of design issues.

Can you suggest a way forward here?

@tsung-wei-huang
Copy link
Member

I am thinking to redesign the current semaphore to support more dynamic and agile usage. For instance:

tf::Semaphore2 semaphore2
taskflow.emplace([&](tf::Runtime& rt){
   // do something
   // ...   
   tf.acquire(semaphore2);
   // do another thing
   // ...
});

taskflow.emplace([&](tf::Runtime& rt){
   // do something
   // ...   
   tf.release(semaphore2);
   // do another thing
   // ...
});

Instead of describing which task to acquire/release a semaphore from the tf::Task handle, the two new methods tf::Runtime::acquire and tf::Runtime::release allow the current task to acquire and release any semaphores while interacting with the current scheduler immediately. Thought?

@bradphelan
Copy link
Contributor

bradphelan commented Oct 14, 2023 via email

@olologin
Copy link
Author

allow the current task to acquire and release any semaphores while interacting with the current scheduler immediately. Thought?

As I understand this will also make semaphores thread-safe? I guess this will resolve our current usage scenario, would be great to have this.

@tsung-wei-huang
Copy link
Member

@olologin yes, this will need to be thread-safe.

@VincentXWD
Copy link
Collaborator

VincentXWD commented Oct 23, 2023

Once two tasks (suppose task A and task B) try to acquire one same semaphore and A may get the semaphore first and run for a long time. Currently task B may occupy a thread until task A release the semaphore(or even later). If we have a coroutine pool to repleace the thread pool will be solve this problem mentioned before, right?

@bradphelan
Copy link
Contributor

bradphelan commented Oct 24, 2023 via email

@VincentXWD
Copy link
Collaborator

VincentXWD commented Oct 24, 2023

@bradphelan Could you please give some more details of this design you mentioned?

The task will never give up the occupation of a thread because of using yield. The workload only releases the time slice not the thread. It will also always try to run on CPU and cause runtime waste(the frequency depends on the implementation of scheduler I think). The worst case is that all workers are occupied by such kind of tasks. So I think the only way to solve it(i mean to implement the embedding-like semaphore) is to use coroutine.

@bradphelan
Copy link
Contributor

bradphelan commented Oct 25, 2023

It's part of the current design of taskflow. If a task calls Executor.corun then the tasks yields to the schedular and corun doesn't return until the schedular decides that the task is ready to run again.

I'm not sure what you mean

The workload only releases the time slice not the thread.

There is no time slicing. It is not a real time schedular and not a pre-emptive schedular. Tasks run until they finish or yield.

note: I'm using the term yield I'm not sure this is the term @tsung-wei-huang would be using. Please correct my terminology if this is confusing.

In some ways taskflow is a bit like using coroutines but not quite. Each task uses the stack of the thread it runs on. This can be quite confusing when looking at the debugger. Tasks that do not seem to be recursive can appear that way in the debugger because multiple tasks can share the same stack.

@VincentXWD
Copy link
Collaborator

Hi @bradphelan thank you for your information. I'm talking about this possible semaphore design:

tf::Semaphore2 semaphore2
taskflow.emplace([&](tf::Runtime& rt){
   // do something
   // ...   
   tf.acquire(semaphore2);
   // do another thing
   // ...
});

taskflow.emplace([&](tf::Runtime& rt){
   // do something
   // ...   
   tf.release(semaphore2);
   // do another thing
   // ...
});

In my opinion, the acquire/release may stuck the process of worker thread if we use the current thread pool model.

@bradphelan
Copy link
Contributor

@VincentXWD tf.aquire() could be implemented the same as corun. If the semaphore is free aquire simply returns. If the semaphore is not free the scheduler takes the next ready task and runs it. When that task is finished the semaphore is checked again and if it is free then the schedular simply returns. If it is not free then the process repeats.

@olologin
Copy link
Author

olologin commented Nov 29, 2023

One of our guys debugged this sample in detail and it seems we were all wrong. It is not data-race in semaphore vector, and it is not going to get solved by semaphore reimplementation.

What is actually happening:
Let's look at this source, it is the same source as in the original post, but slightly restructured to show its intent better:

TEST_CASE("LazyInit_Deadlocks.16threads")
{
	using namespace std::chrono_literals;

	for (size_t i = 0; i < 10000; ++i)
	{
		// std::cout << i << std::endl;
		tf::Executor executor(16);
		tf::Semaphore semaphore(1);
		std::atomic_bool has_value = false;

		// The following lambda simulates lazy initializer of some complex and slow to initialize
		// variable
		auto lazy_init = [&](auto FSlowFunction)
		{
			// If we already have a value, return it
			if (!has_value.load(std::memory_order_acquire))
			{
				// Otherwise, calculate it in a task
				tf::Taskflow b;
				// Let's imagine the following task is just some slow and complex initialization of
				// some meaningful variable
				auto task = b.emplace([&]() {  // Creates tasks b0, b1, ... b15
					if (!has_value.load(std::memory_order_acquire))
					{
						FSlowFunction();
						has_value.store(true, std::memory_order_release);
					}
				});
				task.acquire(semaphore);
				task.release(semaphore);
				executor.corun(b);
			}
			return 99;
		};

		tf::Taskflow a;
		{
			// We have no control over what happens in this block of code
			// This is completely owned by the users of lazy_init
			
			double pi = 0;
			for (size_t k = 0; k < 16; ++k)
			{
				a.emplace(  // Creates tasks a0, a1, ... a15
					[&]
					{
						auto FSlowFunction = [&]()
						{
							tf::Taskflow c;
							c.emplace( // Creates subtask c0, it is single only because lazy_init is supposed to initialize pi once
								[&] { std::this_thread::sleep_for(1ms); });
							executor.corun(c);
							// After all these hard calculations of c0 we found value of Pi
							pi = 3.14159265359;
						};
						lazy_init(FSlowFunction);
						std::cout << "Calculated pi value: " << pi << std::endl;
					});
			}
		}

		// If the following line fails - you are expiriencing a deadlock
		REQUIRE(executor.run(A).wait_for(5s) == std::future_status::ready);
	}
}

If we debug this, we see that C0 is properly executed, but we get into a deadlock because we have this stacktrace in one of our workers:
a0 -> b0 -> a1 -> b1 -> Deadlock
At the b0 we acquire the semaphore, enqueue c0, it gets stolen by another worker and we run "corun". Corun runs outermost task a1, which in turn tries to corun b1 and tries to acquire the same semaphore, since it is already acquired by b0 it is impossible, and we are infinitely stuck in corun on the line executor.corun(c)

Since this executor.corun(c) line is not part of our Lazy class, and it is something that user tries to wrap into the lazy initializer we cannot change that line to something else. At the same time we cannot alter behavior of corun(c), for example to throw some exception, because it will falsely fail b1 task, or may lead to an incorrect pi value being printed.

It seems we are using semaphore/mutex incorrectly here, but there is no good way to reimplement lazy_init currently without blocking all but 1 thread.
Potential ways to resolve this require us to prevent recursive loops like b* -> a* -> b*:

  1. Spawn new executor and run b* in it, but this has a bit of overhead.
  2. Have something like task_arena from Intel TBB and spawn b* tasks in the new task_arena, but it is not implemented in taskflow, duplicate of Task Isolation like TBB #522

@olologin
Copy link
Author

olologin commented Dec 16, 2023

Btw, this is how it looks like in Intel TBB:
Doc: https://oneapi-src.github.io/oneTBB/main/tbb_userguide/design_patterns/Lazy_Initialization.html
Source: https://github.com/oneapi-src/oneTBB/blob/master/include/oneapi/tbb/collaborative_call_once.h

@olologin
Copy link
Author

olologin commented Feb 27, 2024

We will try to implement task isolation soon, because one of our developers is interested in implementing it, and we will propose PR if we make it work :)

Idea so far is to create a separate API class TaskArena which will be owning a separate TaskQueue.
Then create a separate API method like Executor::isolate(TaskArena& ta, Callable&& c).
Then, on entry into Executor::isolate we can temporarily replace task queue of the calling thread to use TaskQueue of TaskArena object.
Then we will adjust the task-stealing loop so that it can also steal from other TaskQueue (owned by TaskArena objects) if the main TaskQueue is empty. Of course worker can only steal task from its TaskArena, or from any TaskQueue/TaskArena if it doesnt have any particular TaskArena associated.

Pseudo C++ code of additions to the API (This is not final):

class TaskArena
{
// Contains separate task queues, one per worker
std::vector<TaskQueue<Node*>> _wsq_per_worker_id;
...
};

// New API function
template<typename Callable>
Executor::isolate(TaskArena& h, Callable&& c)
{
    auto w = _this_worker();
    auto previous_task_arena_ptr = w._task_arena_ptr;
    w.task_arena_ptr = &h;
    c();
    w.task_arena_ptr = previous_task_arena_ptr;
}

// Also worker task stealing loops need to be adjusted to use queues from w.task_arena_ptr at the right moment;

Example of usage:

tf::Executor e;

tf::Taskflow taskflow;
tf::TaskArena arena(e);

thread_local bool is_outer_task_running = false;
taskflow.for_each_index(0, 32, 1, [&](int i)
{
    if(is_outer_task_running)
        throw std::runtime_exception("Cannot happen");
     
    is_outer_task_running = true;
    e.isolate(arena, [&]{
         // Inside of this functor our worker cannot take tasks that are older than this task (e.g. from the outer for_each_index)
         // We can achieve this by switching current thread to another task queue
         tf::Taskflow child_taskflow;
         child_taskflow.for_each_index(0, 2, 1, [&](int j){}); // These tasks will be spawned inside of our new TaskQueue of TaskArena "arena"
         e.corun(child_taskflow); // Guaranteed to never run tasks from the outer loop
    });
    is_outer_task_running = false;
})
e.run(taskflow).wait();

@tsung-wei-huang
Copy link
Member

tsung-wei-huang commented Feb 27, 2024

I think this is a genius design! Perhaps TaskArena can be created from an executor with either unique or shared ownership? In this case, TaskArean can be initialized with information available in executor (e.g., number of workers).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants