Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) add okx spot candles + tests #7022

Merged
merged 14 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions hummingbot/data_feed/candles_feed/candles_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from hummingbot.data_feed.candles_feed.kraken_spot_candles.kraken_spot_candles import KrakenSpotCandles
from hummingbot.data_feed.candles_feed.kucoin_spot_candles.kucoin_spot_candles import KucoinSpotCandles
from hummingbot.data_feed.candles_feed.okx_perpetual_candles.okx_perpetual_candles import OKXPerpetualCandles
from hummingbot.data_feed.candles_feed.okx_spot_candles.okx_spot_candles import OKXSpotCandles


class UnsupportedConnectorException(Exception):
Expand All @@ -34,6 +35,7 @@ class CandlesFactory:
"kucoin": KucoinSpotCandles,
"ascend_ex": AscendExSpotCandles,
"okx_perpetual": OKXPerpetualCandles,
"okx": OKXSpotCandles,
"kraken": KrakenSpotCandles
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd

from hummingbot.core.network_iterator import NetworkStatus, safe_ensure_future
from hummingbot.core.web_assistant.connections.data_types import WSJSONRequest
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.data_feed.candles_feed.candles_base import CandlesBase
from hummingbot.data_feed.candles_feed.data_types import HistoricalCandlesConfig
from hummingbot.data_feed.candles_feed.okx_perpetual_candles import constants as CONSTANTS
from hummingbot.logger import HummingbotLogger

Expand All @@ -24,6 +26,25 @@ def logger(cls) -> HummingbotLogger:
def __init__(self, trading_pair: str, interval: str = "1m",
max_records: int = CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST):
super().__init__(trading_pair, interval, max_records)
self.interval_to_milliseconds_dict = {
"1s": 1000,
"1m": 60000,
"3m": 180000,
"5m": 300000,
"15m": 900000,
"30m": 1800000,
"1h": 3600000,
"2h": 7200000,
"4h": 14400000,
"6h": 21600000,
"8h": 28800000,
"12h": 43200000,
"1d": 86400000,
"3d": 259200000,
"1w": 604800000,
"1M": 2592000000,
"3M": 7776000000
}

@property
def name(self):
Expand Down Expand Up @@ -106,6 +127,27 @@ async def fill_historical_candles(self):
)
await self._sleep(1.0)

async def get_historical_candles(self, config: HistoricalCandlesConfig):
try:
all_candles = []
current_start_time = config.start_time
while current_start_time <= config.end_time:
current_end_time = current_start_time + self.interval_to_milliseconds_dict[config.interval] * CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST
fetched_candles = await self.fetch_candles(end_time=current_end_time)
if fetched_candles.size == 0:
break

all_candles.append(fetched_candles[::-1])
last_timestamp = fetched_candles[0][0] # Assuming the first column is the timestamp
current_start_time = int(last_timestamp)

final_candles = np.concatenate(all_candles, axis=0) if all_candles else np.array([])
candles_df = pd.DataFrame(final_candles, columns=self.columns)
candles_df.drop_duplicates(subset=["timestamp"], inplace=True)
return candles_df
except Exception as e:
self.logger().exception(f"Error fetching historical candles: {str(e)}")

async def _subscribe_channels(self, ws: WSAssistant):
"""
Subscribes to the candles events through the provided websocket connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from hummingbot.data_feed.candles_feed.okx_spot_candles.okx_spot_candles import OKXSpotCandles

__all__ = ["OKXSpotCandles"]
42 changes: 42 additions & 0 deletions hummingbot/data_feed/candles_feed/okx_spot_candles/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from bidict import bidict

from hummingbot.core.api_throttler.data_types import LinkedLimitWeightPair, RateLimit

REST_URL = "https://www.okx.com"
WSS_URL = "wss://ws.okx.com:8443/ws/v5/business"

# GET / Candlesticks history (https://www.okx.com/docs-v5/en/?shell#order-book-trading-market-data-get-candlesticks-history)
# Retrieve history candlestick charts from recent years(It is last 3 months supported for 1s candlestick).
# Rate Limit: 20 requests per 2 seconds
# Rate limit rule: IP
CANDLES_ENDPOINT = "/api/v5/market/history-candles"
INTERVALS = bidict({
"1s": "1s",
"1m": "1m",
"3m": "3m",
"5m": "5m",
"15m": "15m",
"30m": "30m",
"1h": "1H",
"2h": "2H",
"4h": "4H",
"6h": "6Hutc",
"8h": "8Hutc",
"12h": "12Hutc",
"1d": "1Dutc",
"3d": "3Dutc",
"1w": "1Wutc",
"1M": "1Mutc",
"3M": "3Mutc"
})
MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST = 100

# Get system time (https://www.okx.com/docs-v5/en/?shell#public-data-rest-api-get-system-time)
# Retrieve API server time.
# Rate Limit: 10 requests per 2 seconds
# Rate limit rule: IP
HEALTH_CHECK_ENDPOINT = "/api/v5/public/time"

RATE_LIMITS = [
RateLimit(CANDLES_ENDPOINT, limit=20, time_interval=2, linked_limits=[LinkedLimitWeightPair("raw", 1)]),
RateLimit(HEALTH_CHECK_ENDPOINT, limit=10, time_interval=2, linked_limits=[LinkedLimitWeightPair("raw", 1)])]
202 changes: 202 additions & 0 deletions hummingbot/data_feed/candles_feed/okx_spot_candles/okx_spot_candles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import asyncio
import logging
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd

from hummingbot.core.network_iterator import NetworkStatus, safe_ensure_future
from hummingbot.core.web_assistant.connections.data_types import WSJSONRequest
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.data_feed.candles_feed.candles_base import CandlesBase
from hummingbot.data_feed.candles_feed.data_types import HistoricalCandlesConfig
from hummingbot.data_feed.candles_feed.okx_spot_candles import constants as CONSTANTS
from hummingbot.logger import HummingbotLogger


class OKXSpotCandles(CandlesBase):
_logger: Optional[HummingbotLogger] = None

@classmethod
def logger(cls) -> HummingbotLogger:
if cls._logger is None:
cls._logger = logging.getLogger(__name__)
return cls._logger

def __init__(self, trading_pair: str, interval: str = "1m",
max_records: int = CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST):
super().__init__(trading_pair, interval, max_records)
self.interval_to_milliseconds_dict = {
"1s": 1000,
"1m": 60000,
"3m": 180000,
"5m": 300000,
"15m": 900000,
"30m": 1800000,
"1h": 3600000,
"2h": 7200000,
"4h": 14400000,
"6h": 21600000,
"8h": 28800000,
"12h": 43200000,
"1d": 86400000,
"3d": 259200000,
"1w": 604800000,
"1M": 2592000000,
"3M": 7776000000
}

@property
def name(self):
return f"okx_{self._trading_pair}"

@property
def rest_url(self):
return CONSTANTS.REST_URL

@property
def wss_url(self):
return CONSTANTS.WSS_URL

@property
def health_check_url(self):
return self.rest_url + CONSTANTS.HEALTH_CHECK_ENDPOINT

@property
def candles_url(self):
return self.rest_url + CONSTANTS.CANDLES_ENDPOINT

@property
def rate_limits(self):
return CONSTANTS.RATE_LIMITS

@property
def intervals(self):
return CONSTANTS.INTERVALS

async def check_network(self) -> NetworkStatus:
rest_assistant = await self._api_factory.get_rest_assistant()
await rest_assistant.execute_request(url=self.health_check_url,
throttler_limit_id=CONSTANTS.HEALTH_CHECK_ENDPOINT)
return NetworkStatus.CONNECTED

def get_exchange_trading_pair(self, trading_pair):
return trading_pair

async def fetch_candles(self,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: Optional[int] = 100):
rest_assistant = await self._api_factory.get_rest_assistant()
params = {"instId": self._ex_trading_pair, "bar": CONSTANTS.INTERVALS[self.interval], "limit": limit}
if end_time:
params["after"] = end_time
if start_time:
params["before"] = start_time
candles = await rest_assistant.execute_request(url=self.candles_url,
throttler_limit_id=CONSTANTS.CANDLES_ENDPOINT,
params=params)

arr = [[row[0], row[1], row[2], row[3], row[4], row[6], row[7], 0., 0., 0.] for row in candles["data"]]
return np.array(arr).astype(float)

async def fill_historical_candles(self):
max_request_needed = (self._candles.maxlen // CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST) + 1
requests_executed = 0
while not self.ready:
missing_records = self._candles.maxlen - len(self._candles)
end_timestamp = int(self._candles[0][0])
try:
if requests_executed < max_request_needed:
# we have to add one more since, the last row is not going to be included
candles = await self.fetch_candles(end_time=end_timestamp, limit=missing_records + 1)
# we are computing again the quantity of records again since the websocket process is able to
# modify the deque and if we extend it, the new observations are going to be dropped.
missing_records = self._candles.maxlen - len(self._candles)
self._candles.extendleft(candles[-(missing_records + 1):-1])
requests_executed += 1
else:
self.logger().error(f"There is no data available for the quantity of "
f"candles requested for {self.name}.")
raise
except asyncio.CancelledError:
raise
except Exception:
self.logger().exception(
"Unexpected error occurred when getting historical klines. Retrying in 1 seconds...",
)
await self._sleep(1.0)

async def get_historical_candles(self, config: HistoricalCandlesConfig):
try:
all_candles = []
current_start_time = config.start_time
while current_start_time <= config.end_time:
current_end_time = current_start_time + self.interval_to_milliseconds_dict[config.interval] * CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST
fetched_candles = await self.fetch_candles(end_time=current_end_time)
if fetched_candles.size == 0:
break

all_candles.append(fetched_candles[::-1])
last_timestamp = fetched_candles[0][0] # Assuming the first column is the timestamp
current_start_time = int(last_timestamp)

final_candles = np.concatenate(all_candles, axis=0) if all_candles else np.array([])
candles_df = pd.DataFrame(final_candles, columns=self.columns)
candles_df.drop_duplicates(subset=["timestamp"], inplace=True)
return candles_df
except Exception as e:
self.logger().exception(f"Error fetching historical candles: {str(e)}")

async def _subscribe_channels(self, ws: WSAssistant):
"""
Subscribes to the candles events through the provided websocket connection.
:param ws: the websocket assistant used to connect to the exchange
"""
try:
candle_args = []
candle_args.append({"channel": f"candle{CONSTANTS.INTERVALS[self.interval]}", "instId": self._ex_trading_pair})
payload = {
"op": "subscribe",
"args": candle_args
}
subscribe_candles_request: WSJSONRequest = WSJSONRequest(payload=payload)

await ws.send(subscribe_candles_request)
self.logger().info("Subscribed to public klines...")
except asyncio.CancelledError:
raise
except Exception:
self.logger().error(
"Unexpected error occurred subscribing to public klines...",
exc_info=True
)
raise

async def _process_websocket_messages(self, websocket_assistant: WSAssistant):
async for ws_response in websocket_assistant.iter_messages():
data: Dict[str, Any] = ws_response.data
if data is not None and "data" in data: # data will be None when the websocket is disconnected
candles = data["data"][0]
timestamp = candles[0]
open = candles[1]
high = candles[2]
low = candles[3]
close = candles[4]
volume = candles[6]
quote_asset_volume = candles[7]
n_trades = 0.
taker_buy_base_volume = 0.
taker_buy_quote_volume = 0.

candles_row = np.array([timestamp, open, high, low, close, volume,
quote_asset_volume, n_trades, taker_buy_base_volume,
taker_buy_quote_volume]).astype(float)
if len(self._candles) == 0:
self._candles.append(candles_row)
safe_ensure_future(self.fill_historical_candles())
elif int(timestamp) > int(self._candles[-1][0]):
self._candles.append(candles_row)
elif int(timestamp) == int(self._candles[-1][0]):
self._candles.pop()
self._candles.append(candles_row)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from typing import Awaitable
from unittest.mock import AsyncMock, MagicMock, patch

import numpy as np
from aioresponses import aioresponses

from hummingbot.connector.test_support.network_mocking_assistant import NetworkMockingAssistant
from hummingbot.data_feed.candles_feed.data_types import HistoricalCandlesConfig
from hummingbot.data_feed.candles_feed.okx_perpetual_candles import OKXPerpetualCandles, constants as CONSTANTS


Expand Down Expand Up @@ -47,6 +49,11 @@ def async_run_with_timeout(self, coroutine: Awaitable, timeout: int = 1):
ret = asyncio.get_event_loop().run_until_complete(asyncio.wait_for(coroutine, timeout))
return ret

def get_fetched_candles_data_mock(self):
candles = self.get_candles_rest_data_mock()
arr = [[row[0], row[1], row[2], row[3], row[4], row[6], row[7], 0., 0., 0.] for row in candles["data"][::-1]]
return np.array(arr).astype(float)

def get_candles_rest_data_mock(self):
data = {
"code": "0",
Expand Down Expand Up @@ -149,6 +156,25 @@ def test_fetch_candles(self, mock_api: aioresponses):
self.assertEqual(resp.shape[0], len(data_mock["data"]))
self.assertEqual(resp.shape[1], 10)

@patch("hummingbot.data_feed.candles_feed.okx_perpetual_candles.OKXPerpetualCandles.fetch_candles", new_callable=AsyncMock)
def test_get_historical_candles(self, fetched_candles_mock):
config = HistoricalCandlesConfig(connector_name="okx_perpetual",
trading_pair=self.ex_trading_pair,
interval=self.interval,
start_time=1705420800000,
end_time=1705431600000)
resp_1 = self.get_fetched_candles_data_mock()
resp_2 = np.array([])
fetched_candles_mock.side_effect = [resp_1, resp_2]
candles_df = self.async_run_with_timeout(self.data_feed.get_historical_candles(config))

# Check the response
self.assertEqual(candles_df.shape[0], len(resp_1))
self.assertEqual(candles_df.shape[1], 10)

# Check candles integrity. Diff should always be interval in milliseconds and keep sign constant
self.assertEqual(len(candles_df["timestamp"].diff()[1:].unique()), 1, "Timestamp diff should be constant")

def test_candles_empty(self):
self.assertTrue(self.data_feed.candles_df.empty)

Expand Down
Empty file.