Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

equivalent to async #26

Open
maoravni opened this issue May 14, 2018 · 3 comments
Open

equivalent to async #26

maoravni opened this issue May 14, 2018 · 3 comments

Comments

@maoravni
Copy link

What is the LibQ equivalent to std::async? More specifically:

std::future f = std::async([]{
// ...
return 42;
}));
int i = f.get();

when the function blocks until f.get() returns with a value?

@grantila
Copy link
Owner

This is two questions.

The first is the equivalent to std::async which runs a function in a new thread (or potentially re-used thread, or potentially later after other tasks has finished - basically you have no control over it).

In q, you can either schedule a task on an existing threadpool (by associating a scheduler+queue to it), or run a function in a newly created thread using q::run which requires a name (the thread will be named): auto prom = q::run("my-thread", [](int i){ return 42; });.

The second question is how to block-read a promise value. There is no .get() in q, as I consider blocking reads to generally be an anti-pattern which you should try to avoid. You can however create a blocking dispatcher to provide the same functionality:

auto ec = q::make_execution_context< q::blocking_dispatcher >( "blocking-read" );

int n;
prom
    // copy return value from prom to outer scope
    .then( [ &n ]( int val ){ n = val; } )
    // stop the blocking dispatcher
    .then(
        [ ec ]( ){ ec->dispatcher( )->terminate( q::termination::linger ); },
        ec->queue( )
    );

ec->dispatcher( )->start( ); // This will block until it's terminated (above)

n; // is now 42

Do note that if the thread function throws an exception, the returned promise (prom) will contain this exception, and unless you .fail() or .finally(), the above will be unsafe. Check the "hello world" example here.

@maoravni
Copy link
Author

My use-case for using the library was for implementing a multi-producer single-consumer system. Having a block-read is one of the requirements.
Re-use is also a primary requirement, as this is a performance-critical system. I wouldn't want to fire up a thread and then terminate it for a single task.

Is there another way of enqueuing tasks/continuations into a thread, blocking, and then extracting the return data from the promise?

@grantila
Copy link
Owner

Why would blocking read be a requirement? Sequential handling of completions I can understand, but nothing ever really needs to block.

If it's performance critical and you don't want to fire up threads for a single task, std::async is undefined what it does - it might do just this. You probably want:

  • A "blocking" main-thread
  • A threadpool

You put tasks on a queue which is bound to a thread pool - and allow them to execute as soon as possible.
You read the results of these concurrently-executing tasks on a single "main" thread (so the results are read and handled sequentially).

// Make blocking "main" execution context
auto ec_main = q::make_execution_context< q::blocking_dispatcher >( "main" );
// Make an execution context for a threadpool with 4 threads
// This context's _completion_ is scheduled on the blocking dispatcher's queue...
auto ec_pool = q::make_execution_context< q::threadpool, q::direct_scheduler >( "main background pool", ec_main->queue( ), 4 );

// This is a function which should run in the threadpool:
int some_function( ); // defined somewhere else

// Add task "fn" to the threadpool:
auto promise = q::make_promise( ec_pool->queue( ), some_function );

promise.then(
    [ ]( int value ){ /* do something */ },
    ec_main->queue( ) // Will run on the blocking thread
);

ec->dispatcher( )->start( ); // Will block

However, if you're going to continuously read results from background tasks and handle the results, consider using a q::channel. It has a backlog which you can likely ignore unless you want to handle upstream pressure.

// Create a channel with the blocking dispatchers' queue as default.
// Backlog of 5, can be ignored unless you need to handle upstream pressure
q::channel< int > ch( ec_main->queue( ), 5 );

auto readable = ch.get_readable( );
auto writable = ch.get_writable( );

const completion = readable->consume(
    // Will be called for each completed job and run sequentially because
    // this function is called on the blocking dispatcher's "main" thread
    [ ]( int value ){ /* handle value */ }
);
// When all tasks are done, close the writable:
writable.close( );
// The result of consume() is a promise which will be resolved when the writable is closed
completion.then( [ ]( ){ std::cout << "Done" << std::endl; } );

Each background task can then capture writable and writable.write( 42 ); when they are finished. These integer values (in this example) will be written thread-safely to the channel, and read sequentially by the consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants