Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipes from subprocess_exec do not have extra info 'pipe' (incompatible with vanilla asyncio) #595

Open
jensbjorgensen opened this issue Feb 22, 2024 · 1 comment

Comments

@jensbjorgensen
Copy link
Contributor

  • uvloop version:
    0.19.0

  • Python version:
    3.10.12

  • Platform:
    linux

  • Can you reproduce the bug with PYTHONASYNCIODEBUG in env?:
    yes

  • Does uvloop behave differently from vanilla asyncio? How?:
    Yes, this is the point of this issue report. So after you get your process transport from subprocess_exec(...) you then can use get_pipe_transport() to access the transport associated with stdin. With vanilla asyncio you can then call get_extra_info('pipe') on that transport to access the pipe directly. With uvloop get_extra_info('pipe') on the same transport returns None.

I can see in the source that when a pipe transport is created via loop.create_write_pipe(...) we have:

        transp = WriteUnixTransport.new(self, proto, None, waiter)
        transp._add_extra_info('pipe', pipe)

However no transp._add_extra_info(...) is done on the WriteUnixTransport that is created inside the UVProcessTransport code. There are ways to work around this (don't depend on getting access to that pipe) however this works fine in the asyncio loop implementation. I imagine it can be done without too much pain by wrapping the raw file descriptor created in the process transport code however I'm not super fluent in cython so I didn't attempt a patch.

@jensbjorgensen
Copy link
Contributor Author

import asyncio
import typing
import uvloop

class Test(asyncio.SubprocessProtocol):
    def __init__(self, asyncio_impl):
        self._exited = asyncio.Event()
        self._loop = asyncio_impl.new_event_loop()
        self._task = self._loop.create_task(self.test_subprocess_pipe_extra_info())
        self._task.add_done_callback(self._task_done)
        self._ok = True

    def _task_done(self, task: asyncio.Task) -> None:
        if not task.cancelled() and task.exception():
            self._ok = False
            task.print_stack()
        self._loop.stop()
        
    def run(self) -> bool:
        self._loop.run_forever()
        return self._ok
        
    async def test_subprocess_pipe_extra_info(self) -> None:
        proc_trans, _ = await self._loop.subprocess_exec(lambda : self, 'cat', stdin=asyncio.subprocess.PIPE)
        proc_stdin = proc_trans.get_pipe_transport(0)
        pipe = proc_stdin.get_extra_info('pipe')
        pipe.write('hello, test'.encode())
        pipe.close()
        await self._exited.wait()
        self._ok = True

    def pipe_data_received(self, fd: int, data: bytes) -> None:
        print(f"pipe_data_received: {fd} {repr(data)}")

    def pipe_connection_lost(self, fd: int, exc: typing.Optional[Exception]) -> None:
        pass

    def process_exited(self):
        self._exited.set()

for impl_s in ('asyncio', 'uvloop'):
    if Test(globals()[impl_s]).run():
        print(f'{impl_s} works as expected')
    else:
        print(f'{impl_s} is broken it seems (see stack trace)')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant