From 05b94f87dcd255ddf9fe749d6800b81ef722f0d5 Mon Sep 17 00:00:00 2001 From: Michael Bromilow <12384431+ingrinder@users.noreply.github.com> Date: Wed, 10 Jan 2024 19:24:53 +0000 Subject: [PATCH] (WIP) Add concurrent output option Building on slight refactoring of output, allow entries to appear as soon as they are available rather than waiting until all complete. --- archey/__main__.py | 31 ++++-- archey/colors.py | 19 ++-- archey/entry.py | 6 +- archey/output.py | 264 +++++++++++++++++++++++++++++++++++---------- config.json | 1 + 5 files changed, 252 insertions(+), 69 deletions(-) diff --git a/archey/__main__.py b/archey/__main__.py index 4c99a9db..7804a83e 100644 --- a/archey/__main__.py +++ b/archey/__main__.py @@ -10,7 +10,7 @@ import logging import os import sys -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import ExitStack from enum import Enum from typing import Callable, Optional @@ -124,6 +124,7 @@ def args_parsing() -> argparse.Namespace: return parser.parse_args() +# todo: refactor out into functions/methods (too many branches) def main(): """Simple entry point""" args = args_parsing() @@ -152,13 +153,17 @@ def main(): format_to_json=args.json, ) + # Begin with some output as soon as possible + output.begin_output() + # We will map this function onto our enabled entries to instantiate them. - def _entry_instantiator(entry: dict) -> Optional[Entry]: + def _entry_instantiator(entry: dict, index: int) -> Optional[Entry]: # Based on **required** `type` field, instantiate the corresponding `Entry` object. try: return Entries[entry.pop("type")].value( name=entry.pop("name", None), # `name` is fully-optional. options=entry, # Remaining fields should be propagated as options. + index=index, # Provide entry with its position index. ) except KeyError as key_error: logging.warning("One entry (misses or) uses an invalid `type` field (%s).", key_error) @@ -182,11 +187,25 @@ def _entry_instantiator(entry: dict) -> Optional[Entry]: ) mapper = executor.map - for entry_instance in mapper(_entry_instantiator, available_entries): - if entry_instance: - output.add_entry(entry_instance) + # todo: solution fails if parallel_loading false because no executor + if configuration.get("output_concurrency"): + tasks = [] + for index, entry in enumerate(available_entries): + future = executor.submit(_entry_instantiator, entry, index) + output.add_entry_concurrent(future) + tasks.append(future) + + for future in as_completed(tasks): + output.entry_future_done_callback(future) + + else: + for entry_instance in mapper( + _entry_instantiator, available_entries, range(len(available_entries)) + ): + if entry_instance: + output.add_entry(entry_instance) - output.output() + output.finish_output() # Has the screenshot flag been specified ? if args.screenshot is not None: diff --git a/archey/colors.py b/archey/colors.py index a8d0f80a..7117da14 100644 --- a/archey/colors.py +++ b/archey/colors.py @@ -119,14 +119,15 @@ class TerminalMovements(Enum): TO_COLUMN = "G" TO_POSITION = "H" - # override `Style` - def __str__(self): - return Style.escape_code_from_attrs(";".join(map(str, (1, self.value)))) + ERASE_LINE = "K" - # Python 3.6 compatibility due to string format changes, see - # (bpo-28794) - def __format__(self, _): - return super().__str__() + def move(self, *amounts): + """ + Returns an ANSI escape code corresponding to the movement given. + Use `TerminalMovements` for the direction, and specify amounts 1-indexed, row before column. + """ + display_attrs = ";".join(map(str, (*amounts, self.value))) + return f"\x1b[{display_attrs}" class CursorPosition(Style): @@ -141,4 +142,6 @@ def move(movement: TerminalMovements, *amounts): Returns an ANSI escape code corresponding to the movement given. Use `TerminalMovements` for the direction, and specify amounts 1-indexed, row before column. """ - return Style.escape_code_from_attrs(";".join(map(str, (*amounts, movement)))) + # return Style.escape_code_from_attrs(";".join(map(str, (*amounts, movement.value)))) + display_attrs = ";".join(map(str, (*amounts, movement.value))) + return f"\x1b[{display_attrs}" diff --git a/archey/entry.py b/archey/entry.py index cdb34b9a..33febb7b 100644 --- a/archey/entry.py +++ b/archey/entry.py @@ -22,14 +22,18 @@ def __new__(cls, *_, **kwargs): return super().__new__(cls) @abstractmethod - def __init__(self, name: Optional[str] = None, value=None, options: Optional[dict] = None): + def __init__( + self, name: Optional[str] = None, value=None, options: Optional[dict] = None, index: int = 0 + ): # Each entry will have always have the following attributes... # `name`: key (defaults to the instantiated entry class name); # `value`: value of entry as an appropriate object; # `options`: configuration options *specific* to an entry instance; + # 'index': 0-based index (i.e. location in the list) of the entry; self.name = name or self._PRETTY_NAME or self.__class__.__name__ self.value = value self.options = options or {} + self.index = index # Propagates a reference to default strings specified in `Configuration`. self._default_strings = Configuration().get("default_strings") diff --git a/archey/output.py b/archey/output.py index 65f2b433..44b77c55 100644 --- a/archey/output.py +++ b/archey/output.py @@ -3,14 +3,17 @@ It supports entries lazy-insertion, logo detection, and final printing. """ +import logging import os import sys +from bisect import insort +from concurrent.futures import Future, wait from shutil import get_terminal_size from textwrap import TextWrapper from typing import cast from archey.api import API -from archey.colors import ANSI_ECMA_REGEXP, Colors, Style +from archey.colors import ANSI_ECMA_REGEXP, Colors, CursorPosition, Style, TerminalMovements from archey.configuration import Configuration from archey.distributions import Distributions from archey.entry import Entry @@ -27,12 +30,13 @@ class Output: __LOGO_RIGHT_PADDING = " " def __init__(self, **kwargs): - configuration = Configuration() + self.configuration = Configuration() + self._logger = logging.getLogger(self.__module__) # Fetches passed arguments. self._format_to_json = kwargs.get("format_to_json") preferred_logo_style = ( - kwargs.get("preferred_logo_style") or configuration.get("logo_style") or "" + kwargs.get("preferred_logo_style") or self.configuration.get("logo_style") or "" ).upper() try: @@ -52,42 +56,69 @@ def __init__(self, **kwargs): else: self._logo, self._colors = logo_module.LOGO.copy(), logo_module.COLORS.copy() + # Compute the effective logo "width" and "height" from the loaded ASCII art. + self.logo_width = get_logo_width(self._logo, len(self._colors)) + len( + self.__LOGO_RIGHT_PADDING + ) + self.logo_height = len(self._logo) + self.output_exceeded_term_flag = False + # If `os-release`'s `ANSI_COLOR` option is set, honor it. ansi_color = Distributions.get_ansi_color() - if ansi_color and configuration.get("honor_ansi_color"): + if ansi_color and self.configuration.get("honor_ansi_color"): # Replace each Archey integrated colors by `ANSI_COLOR`. self._colors = len(self._colors) * [Style.escape_code_from_attrs(ansi_color)] - entries_color = configuration.get("entries_color") + entries_color = self.configuration.get("entries_color") self._entries_color = ( Style.escape_code_from_attrs(entries_color) if entries_color else self._colors[0] ) # Each entry will be added to this list self._entries = [] - # Each class output will be added in the list below afterwards - self._results = [] + # Each entry `Future` will be added to this set + self._entry_futures = set() def add_entry(self, module: Entry) -> None: """Append an entry to the list of entries to output""" self._entries.append(module) - def output(self) -> None: + def add_entry_concurrent(self, entry_future: Future): + """Append an entry to the list of future entries to output""" + self._entry_futures.add(entry_future) + + def entry_future_done_callback(self, entry_future: Future): + """Add entry to entries list once it has instantiated""" + entry = entry_future.result() + self._entries.append(entry) + if not self._format_to_json and self.configuration.get("output_concurrency"): + self._output_update() + + def begin_output(self) -> None: """ - Main `Output`'s `output` method. - First we get entries to add their outputs to the results and then - calls specific `output` methods based (for instance) on preferred format. + Main method for `Output` to begin outputting entries. + Used to enable to output of logo and finished entries while others are still being + instantiated. + """ + if not self._format_to_json and self.configuration.get("output_concurrency"): + self._output_logo_standalone(0, 0) # assume no padding is necessary to begin with + + def finish_output(self) -> None: + """ + Method to end `Output` output, used to stop output as all entries are finished. + The API requires completion of all entries before output begins, so it's only interacted + with here. """ if self._format_to_json: + wait(self._entry_futures) self._output_json() + elif self.configuration.get("output_concurrency"): + print(CursorPosition.move(TerminalMovements.DOWN, self.logo_height - 1)) + if self.output_exceeded_term_flag: + self._logger.warning("The output was cut off due to the terminal height.") else: - # Iterate through the entries and get their content. - for entry in self._entries: - for entry_line in entry.pretty_value: - self._results.append( - f"{self._entries_color}{entry_line[0]}:{Colors.CLEAR} {entry_line[1]}" - ) - self._output_text() + wait(self._entry_futures) + self._output_legacy() def _output_json(self) -> None: """ @@ -96,32 +127,121 @@ def _output_json(self) -> None: """ print(API(self._entries).json_serialization(indent=cast(int, self._format_to_json) - 1)) - def _output_text(self) -> None: - """ - Finally render the output entries. - It handles text centering additionally to value and colors replacing. - """ - # Compute the effective logo "width" from the loaded ASCII art. - logo_width = get_logo_width(self._logo, len(self._colors)) + def _output_logo_standalone(self, padding_top: int, padding_bottom: int) -> None: + # Safely mutable local copy + logo = self._logo.copy() + + # Add padding to the top & bottom of the logo for centering + colored_empty_line = [str(self._colors[0]) + " " * self.logo_width] + logo[0:0] = colored_empty_line * padding_top + logo.extend(colored_empty_line * padding_bottom) + + avail_term_height = get_terminal_size().lines - 1 # - 1 due to ending newline + self.logo_height = min(len(logo), avail_term_height) + + # If the output was cut-off then set a flag that it happened + if len(logo) != self.logo_height: + self.output_exceeded_term_flag = True + + logo_output = os.linesep.join( + [f"{logo_part}{self.__LOGO_RIGHT_PADDING}" for logo_part in logo][-avail_term_height:] + ) + + try: + # Print the logo (the default `end` adds a trailing newline) + print(logo_output.format(c=self._colors) + str(Colors.CLEAR)) + + # Move the cursor to the top-left + print(CursorPosition.move(TerminalMovements.PREV_LINE, self.logo_height), end="") + + except UnicodeError as unicode_error: + raise ArcheyException( + """\ +Your locale or TTY does not seem to support UTF-8 encoding. +Please disable Unicode within your configuration file.\ +""" + ) from unicode_error + + def _output_update(self) -> None: + # Build a dict of entries + entries_dict = {} + for entry in self._entries: + entries_dict[entry.index] = entry + + # List of all entry lines + results = [] + # The total number of futures we have is the number of entries we have. + # Assume one blank line for each incomplete one, inserting available entries. + for idx in range(len(self._entry_futures)): + try: + # Add entry's lines to results + for entry_line in entries_dict[idx].pretty_value: + results.append( + f"{self._entries_color}{entry_line[0]}:{Colors.CLEAR} {entry_line[1]}" + ) + except KeyError: + # Entry not available yet, add a blank line + results.append("") - # Let's center the entries and the logo (handles odd numbers) - height_diff = len(self._logo) - len(self._results) + # Let's center the entries and the logo vertically (handles odd numbers) + logo_padding_top = 0 + logo_padding_bottom = 0 + height_diff = len(self._logo) - len(results) if height_diff >= 0: - self._results[0:0] = [""] * (height_diff // 2) - self._results.extend([""] * (len(self._logo) - len(self._results))) + results[0:0] = [""] * (height_diff // 2) + results.extend([""] * (len(self._logo) - len(results))) else: - colored_empty_line = [str(self._colors[0]) + " " * logo_width] - self._logo[0:0] = colored_empty_line * (-height_diff // 2) - self._logo.extend(colored_empty_line * (len(self._results) - len(self._logo))) + logo_padding_top = -height_diff // 2 + logo_padding_bottom = len(results) - (len(self._logo) + logo_padding_top) + + # Redraw the logo + self._output_logo_standalone(logo_padding_top, logo_padding_bottom) # When writing to a pipe (for instance), prevent `TextWrapper` from truncating output. if not sys.stdout.isatty(): text_width = cast(int, float("inf")) else: - text_width = get_terminal_size().columns - logo_width - len(self.__LOGO_RIGHT_PADDING) + text_width = get_terminal_size().columns - self.logo_width + + for idx, result in enumerate(results): + if isinstance(result, tuple): + results[idx] = result[1] + avail_term_height = get_terminal_size().lines - 1 # - 1 due to ending newline + results = self._wrap_text_list_to_width(text_width, results)[-avail_term_height:] + # todo: wrap text *and* logo (possibly warn if wrapping out the text entirely?) + + try: + for raw_line in results: + output_line = raw_line.format(c=self._colors) + str(Colors.CLEAR) + # Move the cursor to the end of the logo on this line + print(CursorPosition.move(TerminalMovements.FORWARD, self.logo_width), end="") + # Clear from this position to the end of the line + print(CursorPosition.move(TerminalMovements.ERASE_LINE), end="") + # Print the output + print(output_line, end="") + # Move the cursor down (safe, as the logo print has already made the new lines) + print(CursorPosition.move(TerminalMovements.NEXT_LINE, 1), end="") + + # Move the cursor to the top-left corner + height = max(len(results), self.logo_height) + print(CursorPosition.move(TerminalMovements.PREV_LINE, height), end="") + + except UnicodeError as unicode_error: + raise ArcheyException( + """\ + Your locale or TTY does not seem to support UTF-8 encoding. + Please disable Unicode within your configuration file.\ + """ + ) from unicode_error + @staticmethod + def _wrap_text_list_to_width(width: int, text: list[str]) -> list[str]: + """ + Wraps the list of `text` to the width `width` using `TextWrapper`, including handling ANSI + color escape codes. + """ text_wrapper = TextWrapper( - width=text_width, + width=width, expand_tabs=False, replace_whitespace=False, drop_whitespace=False, @@ -130,43 +250,79 @@ def _output_text(self) -> None: placeholder="...", ) placeholder_length = len(text_wrapper.placeholder) + results = [] - # Using `TextWrapper`, shortens each entry to remove any line overlapping - for i, entry in enumerate(self._results): - # Shortens the entry according to the terminal width. + for line in text: # We have to remove any ANSI color, or the result would be skewed. - wrapped_entry = text_wrapper.fill(Style.remove_colors(entry)) - placeholder_offset = ( - placeholder_length if wrapped_entry.endswith(text_wrapper.placeholder) else 0 + wrapped_line = text_wrapper.fill(Style.remove_colors(line)) + placeholder_end_offset = ( + placeholder_length if wrapped_line.endswith(text_wrapper.placeholder) else 0 ) # By using previous positions, re-inserts ANSI colors back in the wrapped string. - for color_match in ANSI_ECMA_REGEXP.finditer(entry): + for color_match in ANSI_ECMA_REGEXP.finditer(line): match_index = color_match.start() - if match_index <= len(wrapped_entry) - placeholder_offset: - wrapped_entry = ( - wrapped_entry[:match_index] + # Only re-insert before the wrapping placeholder is reached + if match_index <= len(wrapped_line) - placeholder_end_offset: + wrapped_line = ( + wrapped_line[:match_index] + color_match.group() - + wrapped_entry[match_index:] + + wrapped_line[match_index:] ) - # Add a color reset character before the placeholder (if any). - # Rationale : - # We cannot set `Colors.CLEAR` in the placeholder as it would skew its internals. - if placeholder_offset: - wrapped_entry = ( - wrapped_entry[:-placeholder_length] + # Add a color reset character before the placeholder (if one was inserted). + # Rationale : We cannot set `Colors.CLEAR` in the placeholder as it would skew the + # `TextWrapper` internals. + if placeholder_end_offset: + wrapped_line = ( + wrapped_line[:-placeholder_length] + str(Colors.CLEAR) - + wrapped_entry[-placeholder_length:] + + wrapped_line[-placeholder_length:] ) - self._results[i] = wrapped_entry + results.append(wrapped_line) + + return results + + def _output_legacy(self) -> None: + """ + Render the output entries all at once (legacy method). + It handles text centering additionally to value and colors replacing. + """ + # List to hold entry results + results = [] + # Iterate through the entries and get their content. + for entry in self._entries: + for entry_line in entry.pretty_value: + results.append( + f"{self._entries_color}{entry_line[0]}:{Colors.CLEAR} {entry_line[1]}" + ) + + # Let's center the entries and the logo vertically (handles odd numbers) + height_diff = len(self._logo) - len(results) + if height_diff >= 0: + results[0:0] = [""] * (height_diff // 2) + results.extend([""] * (len(self._logo) - len(results))) + else: + colored_empty_line = [str(self._colors[0]) + " " * self.logo_width] + self._logo[0:0] = colored_empty_line * (-height_diff // 2) + self._logo.extend(colored_empty_line * (len(results) - len(self._logo))) + + # When writing to a pipe (for instance), prevent `TextWrapper` from truncating output. + if not sys.stdout.isatty(): + text_width = cast(int, float("inf")) + else: + text_width = ( + get_terminal_size().columns - self.logo_width - len(self.__LOGO_RIGHT_PADDING) + ) + + results = self._wrap_text_list_to_width(text_width, results) # Merge entry results to the distribution logo. logo_with_entries = os.linesep.join( [ f"{logo_part}{self.__LOGO_RIGHT_PADDING}{entry_part}" - for logo_part, entry_part in zip(self._logo, self._results) + for logo_part, entry_part in zip(self._logo, results) ] ) diff --git a/config.json b/config.json index 4c6e24b8..37e954c7 100644 --- a/config.json +++ b/config.json @@ -1,6 +1,7 @@ { "allow_overriding": true, "parallel_loading": true, + "output_concurrency": true, "suppress_warnings": false, "entries_color": "", "honor_ansi_color": true,