Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsafe implementation of WebsocketPool #10

Open
dmkulazhenko opened this issue Apr 24, 2024 · 0 comments
Open

Unsafe implementation of WebsocketPool #10

dmkulazhenko opened this issue Apr 24, 2024 · 0 comments

Comments

@dmkulazhenko
Copy link

async def start(self) -> None:
"""
Initialises the correct number of connections
Restarts the websocket pool if run while already connected
"""
if self.connected:
await self.quit()
# Creates a number of sockets equal to the maximum pool size
sockets = await gather(
*(
connect(
self._url,
max_size=self._max_payload_size,
ping_interval=self._timeout,
)
for _ in range(self._max_pool_size)
)
)
await gather(*(self._sockets.put(socket) for socket in sockets))
self._sockets_used = 0
self.connected = True
@asynccontextmanager
async def get_socket(self) -> WebSocketClientProtocol:
"""
:return: Returns a list of websockets to use
The websockets will be returned to the main pool upon exiting the with statement in which this should be called
"""
# Ensures the batch size returned does not exceed the limit
if not self.connected:
# Ensures that get_socket can be called without needing to explicitly call start() beforehand
await self.start()
socket = await self._sockets.get()
try:
self._sockets_used += 1
yield socket
finally:
self._sockets.task_done()
self._sockets.put_nowait(socket)
self._sockets_used -= 1

Here in WebsocketPool.get_socket context manager you are calling WebsocketPool.start if self.connected is True, which can easily lead to concurrent execution of WebsocketPool.start, which will lead to websocket connection leak around sockets = await gather(connect(...)).

Here is a couple of ideas of how you can improve ur websocket pool:

  • Require pool initialization before acquire. Yes, you have a feature that ws can be acquired without initialization (calling start()), but I think nowadays it's more like bad practice. It will be much safer and more transparent to have a context manager like initialize() that will treat pool creation / destruction;
  • Use asyncio locks around pool initialization / destruction (ez way, but sucks);
  • Do not use await around pool initialization / destruction, so no context switch can happen 😃 You can take some inspiration from https://github.com/fellowapp/asyncio-connection-pool/blob/main/asyncio_connection_pool/__init__.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant