Skip to content

AbstractCCXTInputPlugin Specification

eprbell edited this page Aug 26, 2022 · 10 revisions

The objective of this abstract superclass is to reduce as much as possible the effort to bring up a new data loader plugin in DaLI.

class _ProcessingResult(NamedTuple):
    in_transactions: List[InTransaction]
    out_transactions: List[OutTransaction]
    intra_transactions: List[IntraTransaction]

class AbstractPaginationDetails:
    ...

class DateBasedPaginationDetails(AbstractPaginationDetails):
    ...

class IdBasedPaginationDetails(AbstractPaginationDetails):
    ...

class PageNumberBasedPaginationDetails(AbstractPaginationDetails):
    ...

class AbstractCCXTInputPlugin(AbstractInputPlugin):

    def __init__(
        self,
        account_holder: str,
        native_fiat: str,
        exchange_start_time: Optional[datetime],
        thread_count: Optional[int] = None,
    ) -> None:

        super().__init__(account_holder, native_fiat)
        self.__logger: logging.Logger = self.logger()
        self.__cache_key: str = self.cache_key()
        self.__client: Exchange = self.initialize_client()

        self.__markets: List[str] = []

        self.__start_time: datetime = exchange_start_time
        self.__start_time_ms: int = int(self.__start_time.timestamp()) * _MS_IN_SECOND

        self.__thread_count = thread_count if thread_count else 1

    def plugin_name(self) -> Optional[str]:
        raise NotImplementedError("Abstract method")

    def cache_key(self) -> Optional[str]:
        raise NotImplementedError("Abstract method")

    def logger(self) -> logging.Logger:
        raise NotImplementedError("Abstract method")

    def initialize_client(self) -> Exchange:
        raise NotImplementedError("Abstract method")

    def exchange_name(self) -> str:
        raise NotImplementedError("Abstract method")

    def get_process_deposits_pagination_details(self) -> AbstractPaginationDetails:
        raise NotImplementedError("Abstract method")

    def get_process_withdrawals_pagination_details(self) -> AbstractPaginationDetails:
        raise NotImplementedError("Abstract method")

    def get_process_trades_pagination_details(self) -> AbstractPaginationDetails:
        raise NotImplementedError("Abstract method")

    def load(self) -> List[AbstractTransaction]:
        result: List[AbstractTransaction] = []
        in_transactions: List[InTransaction] = []
        out_transactions: List[OutTransaction] = []
        intra_transactions: List[IntraTransaction] = []

        ccxt_markets: Any = self.client.fetch_markets()
        for market in ccxt_markets:
            self.__logger.debug("Market: %s", json.dumps(market))
            self.__markets.append(market[_ID])

        # TODO - pull valid fiat from abstract_converter_plugin? 
        # This is sometimes needed to filter out buys from conversions.

        self._process_deposits(in_transactions, out_transactions, intra_transactions)
        self._process_withdrawals(out_transactions, intra_transactions)
        self._process_trades(in_transactions, out_transactions)
        self._process_implicit_api(in_transactions, out_transactions, intra_transactions)

        return result

    def _process_deposits(
        self,
        in_transactions: List[InTransaction],
        out_transactions: List[OutTransaction],
        intra_transactions: List[IntraTransaction],
    ) -> None:

        processing_result_list: List[Optional[_ProcessAccountResult]]
        pagination_details: AbstractPaginationDetails = self.get_process_deposits_pagination_details()
        # parameters need to be passed separately
        while pagination_details.evaluate_loop_expression(...)
            crypto_deposits = self.__client.fetch_deposits(
                symbol=pagination_details.symbol,
                since=pagination_details.since,
                limit=pagination_details.limit,
                params=pagination_details.parameters)

            with ThreadPool(self.__thread_count) as pool:
                processing_result_list = pool.map(self._process_transfer, crypto_deposits)

            for processing_result in processing_result_list:
                if processing_result is None:
                    continue
                if processing_result.in_transactions:
                    in_transactions.extend(processing_result.in_transactions)
                if processing_result.out_transactions:
                    out_transactions.extend(processing_result.out_transactions)
                if processing_result.intra_transactions:
                    intra_transactions.extend(processing_result.intra_transactions)
        ...

    def _process_gains(self) -> Optional[_ProcessAccountResult]:
        ...

    def _process_trades(self) -> Optional[_ProcessAccountResult]:
        ...

    def _process_withdrawals(self) -> Optional[_ProcessAccountResult]:
        ...

    def _process_buy(
        self, transaction: Any, in_transaction_list: List[InTransaction], out_transaction_list: List[OutTransaction], notes: Optional[str] = None
    ) -> None:
        ...

    def _process_deposit(self, transaction: Any, in_transaction_list: List[InTransaction], notes: Optional[str] = None) -> None:
        ...

    def _process_gain(self, transaction: Any, transaction_type: Keyword, in_transaction_list: List[InTransaction], notes: Optional[str] = None) -> None:
        ...

    def _process_sell(self, transaction: Any, out_transaction_list: List[OutTransaction], notes: Optional[str] = None) -> None:
        ...

    def _process_transfer(self, transaction: Any, intra_transaction_list: List[IntraTransaction]) -> None:
        ...

    def _process_withdrawal(self, transaction: Any, out_transaction_list: List[OutTransaction], notes: Optional[str] = None) -> None:
        ...

V1 (obsolete):

class AbstractPaginationDetails:
    ...

class DateBasedPaginationDetails(AbstractPaginationDetails):
    ...

class IdBasedPaginationDetails(AbstractPaginationDetails):
    ...

class PageNumberBasedPaginationDetails(AbstractPaginationDetails):
    ...

# The method_2_pagination_details maps client methods to pagination details, so that it's possible to have one pagination detail per method. This mapping is passed in from the subclass at instantiation time.
class AbstractCCXTInputPlugin(AbstractInputPlugin):

    def __init__(
        self,
        account_holder: str,
        native_fiat: str,
#        method_2_pagination_details: Dict[..., _AbstractPaginationDetails] = _DEFAULT_METHOD_2_PAGINATION_DETAILS,
        exchange_start_time: Optional[datetime],
    ) -> None:

        super().__init__(account_holder, native_fiat)
        self.__logger: logging.Logger = self.logger()
        self.__cache_key: str = self.cache_key()
        self.__client: Exchange = self.initialize_client()

        # We have to know what markets exist so that we can pull orders using the market
        self.__markets: List[str] = []

        self.__start_time: datetime = exchange_start_time
        self.__start_time_ms: int = int(self.__start_time.timestamp()) * _MS_IN_SECOND

    # macanudo527: Changed this to be clearer
    def plugin_name(self) -> Optional[str]:
        raise NotImplementedError("Abstract method")

    def cache_key(self) -> Optional[str]:
        raise NotImplementedError("Abstract method")

    def logger(self) -> logging.Logger:
        raise NotImplementedError("Abstract method")

    def initialize_client(self) -> Exchange:
        raise NotImplementedError("Abstract method")

    # eprbell: also, abstract method raising NotImplementedError
    # eprbell: what is the difference between exchange_name and plugin_name?
    # macanudo527: Both are required for the transaction record, but basically, you could have two different plugins that
    # service one exchange. For example, Binance.com has a REST plugin and a supplemental CSV, both would need separate plugin
    # names, but process data from the same exchange.    
    def exchange_name(self) -> str:
        raise NotImplementedError("Abstract method")

    def get_process_deposits_pagination_details(self) -> AbstractPaginationDetails:
        raise NotImplementedError("Abstract method")

    def get_process_withdrawals_pagination_details(self) -> AbstractPaginationDetails:
        raise NotImplementedError("Abstract method")

    def get_process_trades_pagination_details(self) -> AbstractPaginationDetails:
        raise NotImplementedError("Abstract method")

    def load(self) -> List[AbstractTransaction]:
        result: List[AbstractTransaction] = []
        in_transactions: List[InTransaction] = []
        out_transactions: List[OutTransaction] = []
        intra_transactions: List[IntraTransaction] = []
        processed_account_results: List[_ProcessAccountResult] = []

        ccxt_markets: Any = self.client.fetch_markets()
        for market in ccxt_markets:
            self.__logger.debug("Market: %s", json.dumps(market))
            self.markets.append(market[_ID])

        # TODO - pull valid fiat from abstract_converter_plugin? 
        # This is sometimes needed to filter out buys from conversions.

        # For Binance.com the unified function for fetch_deposits() only retrieves crypto deposits unless given a fiat iso code.
        # We will have to use the implicit API 
        # eprbell: sounds good: superclass uses unified, subclasses can add implicit
        processed_account_results.extend(self._process_deposits(in_transactions, out_transactions, intra_transactions))

        processed_account_results.extend(self._process_withdrawals(out_transactions, intra_transactions))

        processed_account_results.extend(self._process_trades(in_transactions, out_transactions))

        # No unified functions for gains, they will have to be resolved with the implicit API
        # eprbell: ok
        processed_account_results.extend(self._process_implicit_api(in_transactions, out_transactions, intra_transactions)

        for processed_result in processed_account_results:
             result.extend(processed_result.in_transactions)
             result.extend(processed_result.out_transactions)
             result.extend(processed_result.intra_transactions)

        return result

    def _process_deposits(
        self,
        in_transactions: List[InTransaction],
        out_transactions: List[OutTransaction],
        intra_transactions: List[IntraTransaction],
    ) -> None:

        # This can be contained in the pagination_details
        # current_start = self.start_time_ms

        # eprbell: could we do something like this?
        # macanudo527: Yeah, this could work. I'm pretty sure it would just need the result set to evaluate, so it wouldn't be
        # too messy.
        # The parameters will have to be updated during the call to the evaluate_loop_expression. I *think* I can get this to work.
        while pagination_details.evaluate_loop_expression(...)
            crypto_deposits = self.__client.fetch_deposits(params=pagination_details.parameters)
        ...

    def _process_gains(self, in_transactions: List[InTransaction], out_transactions: List[OutTransaction]) -> None:
        ...

    def _process_trades(self, in_transactions: List[InTransaction], out_transactions: List[OutTransaction]) -> None:
        ...

    def _process_withdrawals(self, out_transactions: List[OutTransaction], intra_transactions: List[IntraTransaction]) -> None:
        ...

    def _process_buy(
        self, transaction: Any, in_transaction_list: List[InTransaction], out_transaction_list: List[OutTransaction], notes: Optional[str] = None
    ) -> None:
        ...

    def _process_deposit(self, transaction: Any, in_transaction_list: List[InTransaction], notes: Optional[str] = None) -> None:
        ...

    def _process_gain(self, transaction: Any, transaction_type: Keyword, in_transaction_list: List[InTransaction], notes: Optional[str] = None) -> None:
        ...

    def _process_sell(self, transaction: Any, out_transaction_list: List[OutTransaction], notes: Optional[str] = None) -> None:
        ...

    def _process_transfer(self, transaction: Any, intra_transaction_list: List[IntraTransaction]) -> None:
        ...

    def _process_withdrawal(self, transaction: Any, out_transaction_list: List[OutTransaction], notes: Optional[str] = None) -> None:
        ...
Clone this wiki locally