Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AsyncGenerator or AsyncIterator for "args" #46

Open
Ikatono opened this issue May 4, 2024 · 8 comments
Open

Allow AsyncGenerator or AsyncIterator for "args" #46

Ikatono opened this issue May 4, 2024 · 8 comments

Comments

@Ikatono
Copy link

Ikatono commented May 4, 2024

It would be useful to be able start some tasks before all of the arguments are available. This appears to be supported in trimeter, although I haven't tested since the last commit was 4 years ago.

As an example use case, I'm currently trying to consume a paginated API. I have an asynchronous generator that yields the items from a page while pre-loading the next page. As aiometer is right now, (I believe) I need to either generate the whole list of items at first (which requires downloading each page sequentially until I get an empty page), or yield chunks of results and call aiometer.amap on each chunk. It would be cleaner if I could pass the AsyncGenerator directly to amap and have it await items as needed.

@florimondmanca
Copy link
Owner

Hi @Ikatono

This sounds interesting but I'm having a hard time seeing why wou'd need aiometer to iterate over that async iterator?

If you could come up with some example code you'd like to be able to write, or how you would write it with trimeter, that would help clarifying, I think.

@Ikatono
Copy link
Author

Ikatono commented May 4, 2024

Sure. So here's what my current code looks like:

def async_chunk_generator():
    pass

async for chunk in async_chunk_generator():
    await aiometer.run_on_each(action, chunk, max_at_once=mao, max_per_second=mps)

With trimeter I should be able to write this instead:

#yields individual items, not chunks
def async_generator():
    pass

await trimeter.run_on_each(action, async_generator(), max_at_once=mao, max_per_second=mps)

Besides saving me from having to chunk my generator, the second keep max_at_once and max_per_second consistent through the entire process. In the first example, at the end of each chunk I need every task to finish, even when there are fewer than max_at_once remaining.

Looking at trimeter, they convert non-async iterators to async then work through them in an async for loop.

        async with trio.open_nursery() as nursery:
            index = 0
            async for value in iterable:
                for meter_state in meter_states:
                    await meter_state.wait_task_can_start()
                for meter_state in meter_states:
                    meter_state.notify_task_started()
                nursery.start_soon(_worker, async_fn, value, index, config)
                index += 1

The equivalent section in aiometer is:

    async with anyio.create_task_group() as task_group:
        for index, value in enumerate(args):
            for state in meter_states:
                await state.wait_task_can_start()

            for state in meter_states:
                await state.notify_task_started()

            task_group.start_soon(_worker, async_fn, index, value, config)

so maybe something along the lines of:

    async with anyio.create_task_group() as task_group:
        index = 0
        async for value in async_iterable:
            for state in meter_states:
                await state.wait_task_can_start()

            for state in meter_states:
                await state.notify_task_started()

            task_group.start_soon(_worker, async_fn, index, value, config)
            index += 1

@florimondmanca florimondmanca reopened this May 4, 2024
@florimondmanca
Copy link
Owner

florimondmanca commented May 4, 2024

Oops, closing was a misclick.

Right! Thanks so much for the code samples and investigation.

So now it's clearer what needs to be done -- the list argument should be expanded to accept async iterables. "Sync" arguments (list, iterable, etc) should be wrapped in an async iterabke so we can reprogram amap against an async iterabke always.

If you're up for giving this a bite I'd be happy to review a PR -even partial- towards this.

@Ikatono
Copy link
Author

Ikatono commented May 4, 2024

Sure! So the change needs to be applied to amap.py, run_all.py, run_any.py, and run_on_each.py, then a new set of test cases to cover async iterators, right? Am I missing anything?

@florimondmanca
Copy link
Owner

florimondmanca commented May 4, 2024

In short, yes!

Perhaps an update of the API reference as well, or some example of using an async iterator.

@Ikatono
Copy link
Author

Ikatono commented May 7, 2024

Hi @florimondmanca ,

Can you confirm whether the tests pass for you on 0.5.0? About half the tests are failing for me on WSL, seems like a dependency issue I'm having a hard time tracking down.

@florimondmanca
Copy link
Owner

It's very possible that there's a dependency issue, this happens from time to time as transient dependencies may induce some breakage.

If you open up a PR against the current state of your code we'll be able to see how that looks like in CI.

@Ikatono
Copy link
Author

Ikatono commented May 11, 2024

I pulled the repo on a linux machine this time and the tests pass without issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants