Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swap the stream contexts with the taskgroup context #27

Open
graingert opened this issue Jul 5, 2021 · 5 comments
Open

swap the stream contexts with the taskgroup context #27

graingert opened this issue Jul 5, 2021 · 5 comments

Comments

@graingert
Copy link
Contributor

graingert commented Jul 5, 2021

        with send_channel, receive_channel:
            async with anyio.create_task_group() as task_group:

might be better as

        async with anyio.create_task_group() as task_group:
            with send_channel, receive_channel:

this way closing the amap context manager (None, None, None)ly would allow currently running tasks to finish, and prevent new tasks being added. Closing the amap context manager (type[T], T, tb)ly would still cancel all tasks

@florimondmanca
Copy link
Owner

florimondmanca commented Jul 5, 2021

@graingert Interesting. Would you happen to have a concrete use case to illustrate cases that this would address? Perhaps a code snippet of a precise, stripped-down example would help.

@graingert
Copy link
Contributor Author

@florimondmanca looking for a soft-cancel approach for @pfmoore here agronholm/anyio#333 (comment)

@florimondmanca
Copy link
Owner

florimondmanca commented Jul 5, 2021

@graingert Applying your suggestion, I get a ClosedResourceError on the following example:

import random

import aiometer
import trio


async def process(item: int) -> int:
    delay = random.random()
    await trio.sleep(delay)
    return item


@trio.run
async def main() -> None:
    items = list(range(10))

    async with aiometer.amap(process, items) as results:
        async for r in results:
            result = r
            break

    print(result)
Traceback (most recent call last):
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/debug/example.py", line 14, in <module>
    async def main() -> None:
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1932, in run
    raise runner.main_task_outcome.error
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/debug/example.py", line 20, in main
    break
  File "/Users/florimond/.pyenv/versions/3.9.0/lib/python3.9/contextlib.py", line 182, in __aexit__
    await self.gen.__anext__()
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/src/aiometer/_impl/amap.py", line 88, in _amap
    yield receive_channel
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/anyio/_backends/_trio.py", line 139, in __aexit__
    return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
    raise combined_error_from_nursery
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/src/aiometer/_impl/amap.py", line 77, in sender
    await run_on_each(
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/src/aiometer/_impl/run_on_each.py", line 60, in run_on_each
    task_group.start_soon(_worker, async_fn, index, value, config)
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/anyio/_backends/_trio.py", line 139, in __aexit__
    return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
    raise combined_error_from_nursery
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/src/aiometer/_impl/run_on_each.py", line 24, in _worker
    await config.send_to.send(result)
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/anyio/streams/memory.py", line 191, in send
    self.send_nowait(item)
  File "/Users/florimond/Developer/florimondmanca-projects/aiometer/venv/lib/python3.9/site-packages/anyio/streams/memory.py", line 173, in send_nowait
    raise ClosedResourceError
anyio.ClosedResourceError

I believe this is because now send_channel and receive_channel are closed and then sender tasks are handled, which gives time for a sender to try to .send() just before tasks are cancelled, causing a ClosedResourceError.

See, if I add some prints()'s in _worker() in run_in_each.py, just before the call to .send():

        print(f"sending {result}...")
        await config.send_to.send(result)
        print(f"sent {result}")

The chain of events is:

sending 3...  # first_result
send 3
done  # broke out of loop, `aexit` runs
sending 7...  # Causes the exception below
[...] ClosedResourceError [...]

Which is why I was asking for a specific use case example. In what situation does this "soft-cancel" apply?

@graingert
Copy link
Contributor Author

graingert commented Jul 5, 2021

you would do

async def worker(async_fn, fount, sink):
    with fount, sink:
        async for item in fount:
            result = await async_fn(item)
            try:
                await sink.send(result)
            except anyio.ClosedResourceError:
                break

this way closing the pipes cleanly stops the flow of work, but not the work itself

@graingert
Copy link
Contributor Author

also anyio 3 supports taskgroup.start so you can limit tasks with:

async def execute_task(semaphore, async_fn, \, *args, task_status, **kwargs):
    async with semaphore:
        task_status.started()
        await async_fn(*args, **kwargs)

async with anyio.create_task_group() as tg:
    for task in tasks:
        await tg.start(execute_task, task, semaphore)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants