Skip to content

Commit

Permalink
Merge pull request #7022 from tomasgaudino/feat/okx-spot-candles-feed
Browse files Browse the repository at this point in the history
(feat) add okx spot candles + tests
  • Loading branch information
nikspz committed May 20, 2024
2 parents 489601a + 6dcebc5 commit d053013
Show file tree
Hide file tree
Showing 8 changed files with 643 additions and 0 deletions.
2 changes: 2 additions & 0 deletions hummingbot/data_feed/candles_feed/candles_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from hummingbot.data_feed.candles_feed.kraken_spot_candles.kraken_spot_candles import KrakenSpotCandles
from hummingbot.data_feed.candles_feed.kucoin_spot_candles.kucoin_spot_candles import KucoinSpotCandles
from hummingbot.data_feed.candles_feed.okx_perpetual_candles.okx_perpetual_candles import OKXPerpetualCandles
from hummingbot.data_feed.candles_feed.okx_spot_candles.okx_spot_candles import OKXSpotCandles


class UnsupportedConnectorException(Exception):
Expand All @@ -34,6 +35,7 @@ class CandlesFactory:
"kucoin": KucoinSpotCandles,
"ascend_ex": AscendExSpotCandles,
"okx_perpetual": OKXPerpetualCandles,
"okx": OKXSpotCandles,
"kraken": KrakenSpotCandles
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd

from hummingbot.core.network_iterator import NetworkStatus, safe_ensure_future
from hummingbot.core.web_assistant.connections.data_types import WSJSONRequest
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.data_feed.candles_feed.candles_base import CandlesBase
from hummingbot.data_feed.candles_feed.data_types import HistoricalCandlesConfig
from hummingbot.data_feed.candles_feed.okx_perpetual_candles import constants as CONSTANTS
from hummingbot.logger import HummingbotLogger

Expand All @@ -24,6 +26,25 @@ def logger(cls) -> HummingbotLogger:
def __init__(self, trading_pair: str, interval: str = "1m",
max_records: int = CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST):
super().__init__(trading_pair, interval, max_records)
self.interval_to_milliseconds_dict = {
"1s": 1000,
"1m": 60000,
"3m": 180000,
"5m": 300000,
"15m": 900000,
"30m": 1800000,
"1h": 3600000,
"2h": 7200000,
"4h": 14400000,
"6h": 21600000,
"8h": 28800000,
"12h": 43200000,
"1d": 86400000,
"3d": 259200000,
"1w": 604800000,
"1M": 2592000000,
"3M": 7776000000
}

@property
def name(self):
Expand Down Expand Up @@ -106,6 +127,27 @@ async def fill_historical_candles(self):
)
await self._sleep(1.0)

async def get_historical_candles(self, config: HistoricalCandlesConfig):
try:
all_candles = []
current_start_time = config.start_time
while current_start_time <= config.end_time:
current_end_time = current_start_time + self.interval_to_milliseconds_dict[config.interval] * CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST
fetched_candles = await self.fetch_candles(end_time=current_end_time)
if fetched_candles.size == 0:
break

all_candles.append(fetched_candles[::-1])
last_timestamp = fetched_candles[0][0] # Assuming the first column is the timestamp
current_start_time = int(last_timestamp)

final_candles = np.concatenate(all_candles, axis=0) if all_candles else np.array([])
candles_df = pd.DataFrame(final_candles, columns=self.columns)
candles_df.drop_duplicates(subset=["timestamp"], inplace=True)
return candles_df
except Exception as e:
self.logger().exception(f"Error fetching historical candles: {str(e)}")

async def _subscribe_channels(self, ws: WSAssistant):
"""
Subscribes to the candles events through the provided websocket connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from hummingbot.data_feed.candles_feed.okx_spot_candles.okx_spot_candles import OKXSpotCandles

__all__ = ["OKXSpotCandles"]
42 changes: 42 additions & 0 deletions hummingbot/data_feed/candles_feed/okx_spot_candles/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from bidict import bidict

from hummingbot.core.api_throttler.data_types import LinkedLimitWeightPair, RateLimit

REST_URL = "https://www.okx.com"
WSS_URL = "wss://ws.okx.com:8443/ws/v5/business"

# GET / Candlesticks history (https://www.okx.com/docs-v5/en/?shell#order-book-trading-market-data-get-candlesticks-history)
# Retrieve history candlestick charts from recent years(It is last 3 months supported for 1s candlestick).
# Rate Limit: 20 requests per 2 seconds
# Rate limit rule: IP
CANDLES_ENDPOINT = "/api/v5/market/history-candles"
INTERVALS = bidict({
"1s": "1s",
"1m": "1m",
"3m": "3m",
"5m": "5m",
"15m": "15m",
"30m": "30m",
"1h": "1H",
"2h": "2H",
"4h": "4H",
"6h": "6Hutc",
"8h": "8Hutc",
"12h": "12Hutc",
"1d": "1Dutc",
"3d": "3Dutc",
"1w": "1Wutc",
"1M": "1Mutc",
"3M": "3Mutc"
})
MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST = 100

# Get system time (https://www.okx.com/docs-v5/en/?shell#public-data-rest-api-get-system-time)
# Retrieve API server time.
# Rate Limit: 10 requests per 2 seconds
# Rate limit rule: IP
HEALTH_CHECK_ENDPOINT = "/api/v5/public/time"

RATE_LIMITS = [
RateLimit(CANDLES_ENDPOINT, limit=20, time_interval=2, linked_limits=[LinkedLimitWeightPair("raw", 1)]),
RateLimit(HEALTH_CHECK_ENDPOINT, limit=10, time_interval=2, linked_limits=[LinkedLimitWeightPair("raw", 1)])]
202 changes: 202 additions & 0 deletions hummingbot/data_feed/candles_feed/okx_spot_candles/okx_spot_candles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import asyncio
import logging
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd

from hummingbot.core.network_iterator import NetworkStatus, safe_ensure_future
from hummingbot.core.web_assistant.connections.data_types import WSJSONRequest
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.data_feed.candles_feed.candles_base import CandlesBase
from hummingbot.data_feed.candles_feed.data_types import HistoricalCandlesConfig
from hummingbot.data_feed.candles_feed.okx_spot_candles import constants as CONSTANTS
from hummingbot.logger import HummingbotLogger


class OKXSpotCandles(CandlesBase):
_logger: Optional[HummingbotLogger] = None

@classmethod
def logger(cls) -> HummingbotLogger:
if cls._logger is None:
cls._logger = logging.getLogger(__name__)
return cls._logger

def __init__(self, trading_pair: str, interval: str = "1m",
max_records: int = CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST):
super().__init__(trading_pair, interval, max_records)
self.interval_to_milliseconds_dict = {
"1s": 1000,
"1m": 60000,
"3m": 180000,
"5m": 300000,
"15m": 900000,
"30m": 1800000,
"1h": 3600000,
"2h": 7200000,
"4h": 14400000,
"6h": 21600000,
"8h": 28800000,
"12h": 43200000,
"1d": 86400000,
"3d": 259200000,
"1w": 604800000,
"1M": 2592000000,
"3M": 7776000000
}

@property
def name(self):
return f"okx_{self._trading_pair}"

@property
def rest_url(self):
return CONSTANTS.REST_URL

@property
def wss_url(self):
return CONSTANTS.WSS_URL

@property
def health_check_url(self):
return self.rest_url + CONSTANTS.HEALTH_CHECK_ENDPOINT

@property
def candles_url(self):
return self.rest_url + CONSTANTS.CANDLES_ENDPOINT

@property
def rate_limits(self):
return CONSTANTS.RATE_LIMITS

@property
def intervals(self):
return CONSTANTS.INTERVALS

async def check_network(self) -> NetworkStatus:
rest_assistant = await self._api_factory.get_rest_assistant()
await rest_assistant.execute_request(url=self.health_check_url,
throttler_limit_id=CONSTANTS.HEALTH_CHECK_ENDPOINT)
return NetworkStatus.CONNECTED

def get_exchange_trading_pair(self, trading_pair):
return trading_pair

async def fetch_candles(self,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: Optional[int] = 100):
rest_assistant = await self._api_factory.get_rest_assistant()
params = {"instId": self._ex_trading_pair, "bar": CONSTANTS.INTERVALS[self.interval], "limit": limit}
if end_time:
params["after"] = end_time
if start_time:
params["before"] = start_time
candles = await rest_assistant.execute_request(url=self.candles_url,
throttler_limit_id=CONSTANTS.CANDLES_ENDPOINT,
params=params)

arr = [[row[0], row[1], row[2], row[3], row[4], row[5], row[6], 0., 0., 0.] for row in candles["data"]]
return np.array(arr).astype(float)

async def fill_historical_candles(self):
max_request_needed = (self._candles.maxlen // CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST) + 1
requests_executed = 0
while not self.ready:
missing_records = self._candles.maxlen - len(self._candles)
end_timestamp = int(self._candles[0][0])
try:
if requests_executed < max_request_needed:
# we have to add one more since, the last row is not going to be included
candles = await self.fetch_candles(end_time=end_timestamp, limit=missing_records + 1)
# we are computing again the quantity of records again since the websocket process is able to
# modify the deque and if we extend it, the new observations are going to be dropped.
missing_records = self._candles.maxlen - len(self._candles)
self._candles.extendleft(candles[-(missing_records + 1):-1])
requests_executed += 1
else:
self.logger().error(f"There is no data available for the quantity of "
f"candles requested for {self.name}.")
raise
except asyncio.CancelledError:
raise
except Exception:
self.logger().exception(
"Unexpected error occurred when getting historical klines. Retrying in 1 seconds...",
)
await self._sleep(1.0)

async def get_historical_candles(self, config: HistoricalCandlesConfig):
try:
all_candles = []
current_start_time = config.start_time
while current_start_time <= config.end_time:
current_end_time = current_start_time + self.interval_to_milliseconds_dict[config.interval] * CONSTANTS.MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST
fetched_candles = await self.fetch_candles(end_time=current_end_time)
if fetched_candles.size == 0:
break

all_candles.append(fetched_candles[::-1])
last_timestamp = fetched_candles[0][0] # Assuming the first column is the timestamp
current_start_time = int(last_timestamp)

final_candles = np.concatenate(all_candles, axis=0) if all_candles else np.array([])
candles_df = pd.DataFrame(final_candles, columns=self.columns)
candles_df.drop_duplicates(subset=["timestamp"], inplace=True)
return candles_df
except Exception as e:
self.logger().exception(f"Error fetching historical candles: {str(e)}")

async def _subscribe_channels(self, ws: WSAssistant):
"""
Subscribes to the candles events through the provided websocket connection.
:param ws: the websocket assistant used to connect to the exchange
"""
try:
candle_args = []
candle_args.append({"channel": f"candle{CONSTANTS.INTERVALS[self.interval]}", "instId": self._ex_trading_pair})
payload = {
"op": "subscribe",
"args": candle_args
}
subscribe_candles_request: WSJSONRequest = WSJSONRequest(payload=payload)

await ws.send(subscribe_candles_request)
self.logger().info("Subscribed to public klines...")
except asyncio.CancelledError:
raise
except Exception:
self.logger().error(
"Unexpected error occurred subscribing to public klines...",
exc_info=True
)
raise

async def _process_websocket_messages(self, websocket_assistant: WSAssistant):
async for ws_response in websocket_assistant.iter_messages():
data: Dict[str, Any] = ws_response.data
if data is not None and "data" in data: # data will be None when the websocket is disconnected
candles = data["data"][0]
timestamp = candles[0]
open = candles[1]
high = candles[2]
low = candles[3]
close = candles[4]
volume = candles[5]
quote_asset_volume = candles[6]
n_trades = 0.
taker_buy_base_volume = 0.
taker_buy_quote_volume = 0.

candles_row = np.array([timestamp, open, high, low, close, volume,
quote_asset_volume, n_trades, taker_buy_base_volume,
taker_buy_quote_volume]).astype(float)
if len(self._candles) == 0:
self._candles.append(candles_row)
safe_ensure_future(self.fill_historical_candles())
elif int(timestamp) > int(self._candles[-1][0]):
self._candles.append(candles_row)
elif int(timestamp) == int(self._candles[-1][0]):
self._candles.pop()
self._candles.append(candles_row)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from typing import Awaitable
from unittest.mock import AsyncMock, MagicMock, patch

import numpy as np
from aioresponses import aioresponses

from hummingbot.connector.test_support.network_mocking_assistant import NetworkMockingAssistant
from hummingbot.data_feed.candles_feed.data_types import HistoricalCandlesConfig
from hummingbot.data_feed.candles_feed.okx_perpetual_candles import OKXPerpetualCandles, constants as CONSTANTS


Expand Down Expand Up @@ -47,6 +49,11 @@ def async_run_with_timeout(self, coroutine: Awaitable, timeout: int = 1):
ret = asyncio.get_event_loop().run_until_complete(asyncio.wait_for(coroutine, timeout))
return ret

def get_fetched_candles_data_mock(self):
candles = self.get_candles_rest_data_mock()
arr = [[row[0], row[1], row[2], row[3], row[4], row[6], row[7], 0., 0., 0.] for row in candles["data"][::-1]]
return np.array(arr).astype(float)

def get_candles_rest_data_mock(self):
data = {
"code": "0",
Expand Down Expand Up @@ -149,6 +156,25 @@ def test_fetch_candles(self, mock_api: aioresponses):
self.assertEqual(resp.shape[0], len(data_mock["data"]))
self.assertEqual(resp.shape[1], 10)

@patch("hummingbot.data_feed.candles_feed.okx_perpetual_candles.OKXPerpetualCandles.fetch_candles", new_callable=AsyncMock)
def test_get_historical_candles(self, fetched_candles_mock):
config = HistoricalCandlesConfig(connector_name="okx_perpetual",
trading_pair=self.ex_trading_pair,
interval=self.interval,
start_time=1705420800000,
end_time=1705431600000)
resp_1 = self.get_fetched_candles_data_mock()
resp_2 = np.array([])
fetched_candles_mock.side_effect = [resp_1, resp_2]
candles_df = self.async_run_with_timeout(self.data_feed.get_historical_candles(config))

# Check the response
self.assertEqual(candles_df.shape[0], len(resp_1))
self.assertEqual(candles_df.shape[1], 10)

# Check candles integrity. Diff should always be interval in milliseconds and keep sign constant
self.assertEqual(len(candles_df["timestamp"].diff()[1:].unique()), 1, "Timestamp diff should be constant")

def test_candles_empty(self):
self.assertTrue(self.data_feed.candles_df.empty)

Expand Down
Empty file.

0 comments on commit d053013

Please sign in to comment.