Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a helper to assist writing lazy initialized values. #492

Open
bradphelan opened this issue Aug 2, 2023 · 7 comments
Open

Provide a helper to assist writing lazy initialized values. #492

bradphelan opened this issue Aug 2, 2023 · 7 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@bradphelan
Copy link
Contributor

bradphelan commented Aug 2, 2023

It is often desired to lazily initialize a value. However we discovered by accident that using std::mutex and std::scoped_lock with taskflow is a very bad idea. The basic pattern with lazy is to do something like this.

std::optional<int> data;
std::mutex mutex;

int get_data(){
    std::scoped_lock lock(mutex);
    if(data==std::null_opt)
         data = expensive_operation();
   return data;
}

However we discovered this will deadlock under taskflow if expensive_operation itself dispatches tasks using corun. It can turn out that the same thread tries to lock the mutex twice. The sum of it is that mutex and taskflow do not play nice together. The correct way is to use taskflow semaphores to restrict access to the critical section of the lazy initialization. However it is not immediately obvious how to set this up. I have a test case which I think shows a model that can work for dynamic tasks. Will this work or is there a better way to achieve this. As well, it may be worth adding this use case to the documentation or examples.

The nice thing about using semaphores is that I think they suspend the task but not the thread. Is this true?

TEST(Taskflow, Lazy)
{
	using namespace std::chrono_literals;
	tf::Executor executor(8);  // create an executor of 8 workers

	tf::Semaphore semaphore(1);  // create a semaphore with initial count 1

	/// value to be lazy initialized
	std::optional<int> data = std::nullopt;

	/// proof that initialization only occurs once
	std::atomic<int> count = 0;

	// Initialize the lazy value
	auto init = [&]()
	{
		tf::Taskflow taskflow;
		auto task = taskflow.emplace(
			[&]()
			{
				if (data == std::nullopt)
				{

					count++;
					std::cout << "initializing " << count.load() << std::endl;
					std::this_thread::sleep_for(500ms);
					data = 99;
				}
			}).name("lazy init");
		task.acquire(semaphore);
		task.release(semaphore);
		executor.corun(taskflow);
		return *data;
	};


	auto job = [&]()
	{
		EXPECT_EQ(init(),99);
		tf::Taskflow taskflow;
		taskflow.for_each_index(
			0,
			10,
			1,
			[&](int i)
			{
				i;
				EXPECT_EQ(init(),99);
				std::this_thread::sleep_for(50ms);
			}).name("loop");
		executor.corun(taskflow);
	};


	tf::Taskflow taskflow;
	taskflow.emplace([&]() {

		tf::Taskflow taskflow;
		std::vector<tf::Task> tasks{
			taskflow.emplace(job).name("job 1"),
			taskflow.emplace(job).name("job 2"),
			taskflow.emplace(job).name("job 3"),
			taskflow.emplace(job).name("job 4"),
			taskflow.emplace(job).name("job 5")};

		executor.corun(taskflow);
	});


	executor.run(taskflow).wait();

	EXPECT_EQ(count.load(), 1);
	EXPECT_EQ(data, 99);
}

@bradphelan
Copy link
Contributor Author

bradphelan commented Aug 2, 2023

I've factored out the pattern into a class called tfLazy designed to work with dynamic tasking.

template <class T>
class tfLazy
{
  /// The internal implementation that all instances share
  struct mwLazyImpl
  {

    /// The function used to generate the result
    const std::function<T()> m_fn;
    /// The executor / scheduler
    tf::Executor& m_executor;
    /// Critical section control
    tf::Semaphore m_semaphore;  
    /// The cached result
    std::variant<std::monostate, T, std::exception_ptr> m_data = std::monostate{};

    mwLazyImpl(std::function<T()> f, tf::Executor& ex)
      : m_fn(f), m_executor(ex), m_semaphore(1), m_data(std::monostate{})
    {
    }

    bool has_value() const { return !std::holds_alternative<std::monostate>(m_data); }

    T* get()
    {
      tf::Taskflow taskflow;
      auto task = taskflow
              .emplace(
                [&]()
                {
                  if (!has_value())
                    try
                    {
                      m_data = m_fn();
                    }
                    catch (...)
                    {
                      m_data = std::current_exception();
                    }
                })
              .name("lazy init");
      task.acquire(m_semaphore);
      task.release(m_semaphore);
      m_executor.corun(taskflow);

      // At this point we hold either an exception or a value
      if (std::holds_alternative<std::exception_ptr>(m_data))
        std::rethrow_exception(std::get<std::exception_ptr>(m_data));

      return &std::get<T>(m_data);
    }
  };

public:
  /// Pass a nullary (factory) function to be evaluated later.
  template <typename Function>
  requires std::is_invocable_r_v<T, Function> tfLazy(Function f, tf::Executor & ex)
    : m_impl(std::make_shared<mwLazyImpl>(f, ex))
  {
    /// Returning a raw pointer here is bad behaviour
    /// as it is not clear at all who owns the value
    /// and who is responsible for deleting it.
    static_assert(!std::is_pointer_v<T>, "Factory function should not return a raw pointer");
  }

  T const& operator*() const { return *m_impl->get(); }
  T const* operator->() const { return m_impl->get(); }

  /// Returns true if the result has been calculated
  operator bool() const { return m_impl->has_value(); }

private:
  std::shared_ptr<mwLazyImpl> m_impl;

};

TEST(Taskflow, Lazy)
{
  using namespace std::chrono_literals;
  tf::Executor executor(8);  // create an executor of 8 workers

  /// proof that initialization only occurs once
  std::atomic<int> count = 0;

  tfLazy<int> data(
    [&]()
    {
      count++;
      return 99;
    },
    executor);


  auto job = [&]()
  {
    EXPECT_EQ(*data,99);
    tf::Taskflow taskflow;
    taskflow.for_each_index(
      0,
      100,
      1,
      [&](int i)
      {
        i;
        EXPECT_EQ(*data,99);
        std::this_thread::sleep_for(5ms);
      }).name("loop");
    executor.corun(taskflow);
  };


  tf::Taskflow taskflow;
  taskflow.emplace([&]() {

    tf::Taskflow taskflow;
    std::vector<tf::Task> tasks{
      taskflow.emplace(job).name("job 1"),
      taskflow.emplace(job).name("job 2"),
      taskflow.emplace(job).name("job 3"),
      taskflow.emplace(job).name("job 4"),
      taskflow.emplace(job).name("job 5")};

    executor.corun(taskflow);

    EXPECT_EQ(count.load(), 1);
    EXPECT_EQ(*data, 99);
  });


  executor.run(taskflow).wait();

}

@bradphelan
Copy link
Contributor Author

I found this comment from 5 years ago on reddit which is exactly my issue

https://www.reddit.com/r/cpp/comments/8r6lqu/cpptaskflow_fast_c_parallel_programming_with_task/

image

So this is something you have thought about it seems. Is there a solution?

@tsung-wei-huang
Copy link
Member

@bradphelan yes, tf::Semaphore is defined at the task level. Basically, when a thread grabs a task and that task is constrained by a semaphore, the thread will try to acquire the semaphore. If the acquisition is successful, it will schedule it. Or, it will switch to run another tasks (similar to coroutine-based multitasking).

@bradphelan
Copy link
Contributor Author

I would suggest that you add a prominent explanation as to why using standard thread locks with the taskflow schedular is problematic. It is now obvious to me what is going on but when I first saw my non recursive code recursively calling itself it was a light-bulb moment but it took us an hour staring at the stack trace to realize what we were looking at.

Has any work been done creating a wrapper for taskflow that uses co_await and coroutines?

@tsung-wei-huang
Copy link
Member

Hi @bradphelan thank you for the suggestions! Currently, we do not support co_await and coroutines, but it's under our research agenda :)

In the meantime, feel free to help contribute/modify our documentation with the necessary explanation, and create a pull request if that is convenient for you. I might be a bit slow in response due to some deadlines.

@bradphelan
Copy link
Contributor Author

bradphelan commented Aug 4, 2023

A slightly related question.

https://taskflow.github.io/taskflow/AsyncTasking.html#LaunchAsynchronousTasksFromAnSubflow

Joining async tasks from a subflow. Does join behave like wait or co_run. ie: does it block the current thread or steal tasks ready to run whilst waiting for it's own tasks to finish?

I have to make a presentation about taskflow at work and this is the critical point for users to understand, the co-routine like behaviour of taskflow vs a thread blocking like behaviour of a traditional coding style. I notice that the doc for join doesn't explicitly say what the behaviour should be. I've tried reading the source but I'm still not 100% clear on the internals.

It makes sense that it should behave like co_run otherwise if all worker threads spawn async tasks and then join them the worker pool would deadlock right?

@bradphelan
Copy link
Contributor Author

bradphelan commented Aug 4, 2023

I think I have answered this myself.

https://godbolt.org/z/nb5ooezdj

It joins on a specific worker but that worker is still able to process tasks while the joining task is suspended.

In this case perhaps it should be called co_join to suggest the same behaviour as co_run

@tsung-wei-huang tsung-wei-huang added enhancement New feature or request help wanted Extra attention is needed labels Feb 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants