Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket Logic Refactor - start/stop mechanism #232

Closed
Trevypants opened this issue Jun 12, 2024 · 6 comments · Fixed by #233 or #230
Closed

Websocket Logic Refactor - start/stop mechanism #232

Trevypants opened this issue Jun 12, 2024 · 6 comments · Fixed by #233 or #230
Assignees
Labels
Breaking Breaking changes enhancement New feature or request Futures Topic related to Futures trading Spot Topic related to Spot trading
Milestone

Comments

@Trevypants
Copy link

Describe the solution you'd like

I am working with the websocket client and I am missing the 'start/stop' control.

  1. Starting
    Currently the code is set that the moment you initialize the WS client, it opens a connection. However, I believe this logic is quite restricting since there will likely be scenarios where a user doesn't want to open the WS connection right away but have it on standby. An alternative would be to implement a start method (naming choice TBD) that will open the WS connection.

  2. Stopping
    Currently the code is set that there is no way to stop the WS connection from the user side. When looking into the code, I see that the connection is fully stopped only when an exception occurs. When testing the WS client on my side, stopping the code with this behavior leads to multiple Runtime Errors. Having a stop method (naming choice TBD) that will close all WS connections would really help in this and will also add extra flexibility to the WS usage.

Describe alternatives you've considered

I have implemented a hacky solution on my side with an example start and stop method for the spot WS client. The code is shown below. A few points to note:

  • For the base connector, I have centered the stopping logic on the keep_alive variable (now a class attribute). Instead of manually cancelling the WS coroutine task, setting the keep_alive variable to False will allow the coroutines to end gracefully without needing to catch the asyncio.CancelledError exception.
  • For the base connector, it now has a start method that creates the WS task instead of it being on the object init() method.
  • For the base connector, it now has a stop method that stops the running WS connection by setting the keep_alive variable to False and then awaits the task. Since the value is set to False, the while loops will all end and the task will successfully complete instead of the original logic of waiting for an exception. This allows for graceful shutdown.
  • For the base WS client, a start and stop method has been implemented to call the connectors' start and stop method.
# kraken/spot/websocket/connectors.py

class ConnectSpotWebsocketBase:
    def __init__(
        self: ConnectSpotWebsocketBase,
        client: KrakenSpotWSClientBase,
        endpoint: str,
        callback: Callable,
        *,
        is_auth: bool = False,
    ) -> None:
        # Initialization code is the same ...
        # This create_task line has been moved to the new `start` method
        # self.task: asyncio.Task = asyncio.create_task(self.__run_forever())

    async def start(self: ConnectSpotWebsocketBase) -> None:
        """Starts the websocket connection"""
        if hasattr(self, "task") and not self.task.done():
            return
        self.task = asyncio.create_task(self.__run_forever())

    async def stop(self: ConnectSpotWebsocketBase) -> None:
        """Stops the websocket connection"""
        self.keep_alive = False
        if hasattr(self, "task") and not self.task.done():
            await self.task

    async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
        """
        This function establishes the websocket connection and runs until
        some error occurs.

        :param event: Event used to control the information flow
        :type event: asyncio.Event
        """
        self._last_ping = time()
        self.ws_conn_details = (
            None if not self.__is_auth else self.__client.get_ws_token()
        )
        self.LOG.debug(
            "Websocket token: %s",
            self.ws_conn_details,
        )

        async with websockets.connect(  # type: ignore
            f"wss://{self.__ws_endpoint}",
            extra_headers={"User-Agent": "python-kraken-sdk"},
            ping_interval=30,
        ) as socket:
            self.LOG.info("Websocket connected!")
            self.socket = socket

            if not event.is_set():
                await self.send_ping()
                event.set()
            self.__reconnect_num = 0

            while self.keep_alive:   # <----- Loop has been changed to depend on class variable
                if time() - self._last_ping > self.PING_INTERVAL:
                    await self.send_ping()
                try:
                    _message = await asyncio.wait_for(self.socket.recv(), timeout=5) # <---- Timeout was decreased to avoid having lengthy stop times
                except TimeoutError:  # important
                    pass # <----- Removed sending the ping since timeout was decreased
                except asyncio.CancelledError:
                    self.LOG.exception("asyncio.CancelledError")
                    self.keep_alive = False
                    await self.__callback({"error": "asyncio.CancelledError"})
                else:
                    try:
                        message: dict = json.loads(_message)
                    except ValueError:
                        self.LOG.warning(_message)
                    else:
                        self.LOG.debug(message)
                        self._manage_subscriptions(message=message)
                        await self.__callback(message)

    async def __run_forever(self: ConnectSpotWebsocketBase) -> None:
        """
        This function ensures the reconnects.

        todo: This is stupid. There must be a better way for passing
              the raised exception to the client class - not
              through this ``exception_occur`` flag
        """
        self.keep_alive = True  # <----- Starting the run forever method now is based on the keep_alive variable
        try:
            while self.keep_alive: # <------ Notice the loop
                await self.__reconnect()
        except MaxReconnectError:
            await self.__callback(
                {"error": "kraken.exceptions.MaxReconnectError"},
            )
        except Exception as exc:
            traceback_: str = traceback.format_exc()
            logging.exception(
                "%s: %s",
                exc,
                traceback_,
            )
            await self.__callback({"error": traceback_})
        # finally:
        #     await self.__callback(
        #         {"error": "Exception stopped the Kraken Spot Websocket Client!"},
        #     )
        #     self.__client.exception_occur = True

    async def __reconnect(self: ConnectSpotWebsocketBase) -> None:
        """
        Handles the reconnect - before starting the connection and after an
        error.

        :raises KrakenException.MaxReconnectError: If there are to many
            reconnect retries
        """
        self.LOG.info("Websocket start connect/reconnect")

        self.__reconnect_num += 1
        if self.__reconnect_num >= self.MAX_RECONNECT_NUM:
            raise MaxReconnectError(
                "The Kraken Spot websocket client encountered to many reconnects!",
            )

        reconnect_wait: float = self.__get_reconnect_wait(self.__reconnect_num)
        self.LOG.debug(
            "asyncio sleep reconnect_wait=%.1f s reconnect_num=%d",
            reconnect_wait,
            self.__reconnect_num,
        )
        await asyncio.sleep(reconnect_wait)

        event: asyncio.Event = asyncio.Event()
        tasks: list[asyncio.Task] = [
            asyncio.create_task(self._recover_subscriptions(event)),
            asyncio.create_task(self.__run(event)),
        ]

        while self.keep_alive: # <----- Retrying on exceptions is still valid except until the keep_alive variable is set to False
            finished, pending = await asyncio.wait(
                tasks,
                return_when=asyncio.ALL_COMPLETED,
            )
            exception_occur: bool = False
            for task in finished:
                if task.exception():
                    exception_occur = True
                    traceback.print_stack()
                    message: str = f"{task} got an exception {task.exception()}\n {task.get_stack()}"
                    self.LOG.warning(message)
                    for process in pending:
                        self.LOG.warning("pending %s", process)
                        try:
                            process.cancel()
                        except asyncio.CancelledError:
                            self.LOG.exception("asyncio.CancelledError")
                    await self.__callback({"error": message})
            if exception_occur:
                break
        self.LOG.warning("Connection closed")
# kraken/spot/websocket/__init__.py

class KrakenSpotWSClientBase(KrakenSpotBaseAPI):
    
    async def start(self):
        """Method to start the websocket connection."""
        await self._pub_conn.start()  # type: ignore
        await self._priv_conn.start()  # type: ignore

    async def stop(self):
        """Method to stop the websocket connection."""
        await self._pub_conn.stop()  # type: ignore
        await self._priv_conn.stop()  # type: ignore

Additional context

Here is an example usage of my hacky solution :)

class Client(KrakenSpotWSClientV2):
        """Can be used to create a custom trading strategy"""

        async def on_message(self, message):
            """Receives the websocket messages"""
            if message.get("method") == "pong" \
                or message.get("channel") == "heartbeat":
                return

            print(message)

# Does not open the WS connection yet      
client = Client()
      
async def main():
        # Does not open the WS connection yet
        client = Client()

        # Open
        await client.start()

        await client.subscribe(
               params={"channel": "ticker", "symbol": ["BTC/USD", "DOT/USD"]}
        )
  
        # Wait a few seconds
        await asyncio.sleep(10)

        # Unsubscribe
        await client.unsubscribe(
               params={"channel": "ticker", "symbol": ["BTC/USD", "DOT/USD"]}
        )

        # Close
        await client.stop()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass
@btschwertfeger btschwertfeger added this to the v3.0.0 milestone Jun 14, 2024
@btschwertfeger btschwertfeger self-assigned this Jun 14, 2024
@btschwertfeger btschwertfeger added enhancement New feature or request Futures Topic related to Futures trading Spot Topic related to Spot trading Breaking Breaking changes labels Jun 14, 2024
@btschwertfeger btschwertfeger changed the title Websocket Logic Refactor Websocket Logic Refactor - start/stop mechanism Jun 15, 2024
@btschwertfeger btschwertfeger linked a pull request Jun 15, 2024 that will close this issue
21 tasks
@btschwertfeger
Copy link
Owner

Hey @Trevypants, thanks again for that suggestion! I know back then I was looking for exact this behavior, but somehow I never sat down to change anything there, since nobody complained so far.

Currently I'm reworking the project; your suggested changes will be part of version 3.0.0. Let me know if you have any further ideas that might be worth to integrate!

@Trevypants
Copy link
Author

Thank you :) I will make other issues if I can think of any extra ideas

@OpenCoderX
Copy link

What pattern should I use to recover from cancellederror, I want to keep the socket open to monitor for executions on my account but it seems the CancelledError is always raised.

@btschwertfeger
Copy link
Owner

What pattern should I use to recover from cancellederror, I want to keep the socket open to monitor for executions on my account but it seems the CancelledError is always raised.

What do you mean with "the CancelledError is always raised"? Could you please share the relevant lines.

In v3.0.0 of the SDK, there will be the start/stop feature, as proposed here, meaning that you can start and stop the connection at any time. For the current versions of the SDK, you will need to instantiate the websocket client again.

@OpenCoderX
Copy link

OpenCoderX commented Jun 22, 2024

Sorry for the lack of clarity. This is what I did to get a basic example of an authenticated user execution socket up and running without seeing the program end with CancelledError. Please let me know if it is an acceptable pattern to follow until 3.0 is released, thanks for the help and work on this. I didn't need to instantiate the WebSocket more than once. I'm just starting to wrap my brain around asyncio, so I'm not sure if this pattern is acceptable.

"""
Module that provides an example usage for the KrakenSpotWebsocketClient.
It uses the Kraken Websocket API v2.
"""

from __future__ import annotations

import asyncio
import contextlib
import logging
import logging.config
import os
from contextlib import suppress
from kraken.spot import KrakenSpotWSClientV2
logging.basicConfig(
    format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
    datefmt="%Y/%m/%d %H:%M:%S",
    level=logging.INFO,
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)


async def main(api_key: str, secret_key: str) -> None:
    key: str = api_key
    secret: str = secret_key

    class Client(KrakenSpotWSClientV2):
        """Can be used to create a custom trading strategy"""
        async def on_message(self: Client, message: dict) -> None:
            """Receives the websocket messages"""
            if message.get("method") == "pong" or message.get("channel") == "heartbeat":
                print('got pong or heartbeat')
                return
            print(message)

            if message.get('event') == 'asyncio.CancelledError':
                print('got asyncio.CancelledError')
                print('doing nothing')
                return
            # now you can access lots of methods, for example to create an order:
            # if self._is_auth:  # only if the client is authenticated …
            #     await self.send_message(
            #         message={
            #             "method": "add_order",
            #             "params": {
            #                 "limit_price": 1234.56,
            #                 "order_type": "limit",
            #                 "order_userref": 123456789,
            #                 "order_qty": 1.0,
            #                 "side": "buy",
            #                 "symbol": "BTC/USD",
            #                 "validate": True,
            #             },
            #         }
            #     )
            # ... it is also possible to call regular REST endpoints
            # but using the websocket messages is more efficient.
            # You can also un-/subscribe here using self.subscribe/self.unsubscribe.

    if key and secret:
        # Per default, the authenticated client starts two websocket connections,
        # one for authenticated and one for public messages. If there is no need
        # for a public connection, it can be disabled using the ``no_public``
        # parameter.
        client_auth = Client(key=key, secret=secret, no_public=True)
        print(client_auth.private_channel_names)  # … list private channel names
        # when using the authenticated client, you can also subscribe to public feeds
        await client_auth.subscribe(params={"channel": "executions"})

        while True:
            try:  # TRY ADDED HERE
                logging.info("Running main loop")
                await asyncio.sleep(5)
            except asyncio.CancelledError:
                logging.info("Main task was cancelled")
                # Handle cleanup if necessary, but don't close the connection
                pass
            # await client_auth.unsubscribe(params={"channel": "executions"})

if __name__ == "__main__":
    with suppress(KeyboardInterrupt):
        asyncio.run(main(api_key='yyyyy', secret_key='xxxxx'))
    # The websocket client will send {'event': 'asyncio.CancelledError'}
    # via on_message so you can handle the behavior/next actions
    # individually within your strategy.

@btschwertfeger
Copy link
Owner

Sorry for the lack of clarity. This is what I did to get a basic example of an authenticated user execution socket up and running without seeing the program end with CancelledError. Please let me know if it is an acceptable pattern to follow until 3.0 is released, thanks for the help and work on this. I didn't need to instantiate the WebSocket more than once. I'm just starting to wrap my brain around asyncio, so I'm not sure if this pattern is acceptable.

"""
Module that provides an example usage for the KrakenSpotWebsocketClient.
It uses the Kraken Websocket API v2.
"""

from __future__ import annotations

import asyncio
import contextlib
import logging
import logging.config
import os
from contextlib import suppress
from kraken.spot import KrakenSpotWSClientV2
logging.basicConfig(
    format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
    datefmt="%Y/%m/%d %H:%M:%S",
    level=logging.INFO,
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)


async def main(api_key: str, secret_key: str) -> None:
    key: str = api_key
    secret: str = secret_key

    class Client(KrakenSpotWSClientV2):
        """Can be used to create a custom trading strategy"""
        async def on_message(self: Client, message: dict) -> None:
            """Receives the websocket messages"""
            if message.get("method") == "pong" or message.get("channel") == "heartbeat":
                print('got pong or heartbeat')
                return
            print(message)

            if message.get('event') == 'asyncio.CancelledError':
                print('got asyncio.CancelledError')
                print('doing nothing')
                return
            # now you can access lots of methods, for example to create an order:
            # if self._is_auth:  # only if the client is authenticated …
            #     await self.send_message(
            #         message={
            #             "method": "add_order",
            #             "params": {
            #                 "limit_price": 1234.56,
            #                 "order_type": "limit",
            #                 "order_userref": 123456789,
            #                 "order_qty": 1.0,
            #                 "side": "buy",
            #                 "symbol": "BTC/USD",
            #                 "validate": True,
            #             },
            #         }
            #     )
            # ... it is also possible to call regular REST endpoints
            # but using the websocket messages is more efficient.
            # You can also un-/subscribe here using self.subscribe/self.unsubscribe.

    if key and secret:
        # Per default, the authenticated client starts two websocket connections,
        # one for authenticated and one for public messages. If there is no need
        # for a public connection, it can be disabled using the ``no_public``
        # parameter.
        client_auth = Client(key=key, secret=secret, no_public=True)
        print(client_auth.private_channel_names)  # … list private channel names
        # when using the authenticated client, you can also subscribe to public feeds
        await client_auth.subscribe(params={"channel": "executions"})

        while True:
            try:  # TRY ADDED HERE
                logging.info("Running main loop")
                await asyncio.sleep(5)
            except asyncio.CancelledError:
                logging.info("Main task was cancelled")
                # Handle cleanup if necessary, but don't close the connection
                pass
            # await client_auth.unsubscribe(params={"channel": "executions"})

if __name__ == "__main__":
    with suppress(KeyboardInterrupt):
        asyncio.run(main(api_key='yyyyy', secret_key='xxxxx'))
    # The websocket client will send {'event': 'asyncio.CancelledError'}
    # via on_message so you can handle the behavior/next actions
    # individually within your strategy.

One option would be to put the while loop into a separate function and run it until client.exception_occur. If client.exception_occur and the loop exits, you could than reinstantiate the client and call the function containing the loop again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Breaking Breaking changes enhancement New feature or request Futures Topic related to Futures trading Spot Topic related to Spot trading
Projects
None yet
3 participants