diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aebe4c4..5cffca3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -11,13 +11,12 @@ jobs: - name: Checkout uses: actions/checkout@v2 - - name: Install package in dev mode - run: pip install -e . - - - name: Install test requirements - run: pip install -r requirements_test.txt + - name: Install the project + run: make install + env: + TERM: xterm-256color - - name: Build - run: ./build.sh + - name: Run the build + run: make build env: TERM: xterm-256color diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..63608db --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +build: fmt +build: test + +fmt: + black . && flake8 . && isort . + +test: + pytest --cov=s3_browser + + +install: + pip install -e . + pip install -r requirements_test.txt diff --git a/README.md b/README.md index 3850f6b..d14520f 100644 --- a/README.md +++ b/README.md @@ -31,18 +31,16 @@ And then run with `s3-browser`. ### Running tests -Install the project into your virtualenv in development mode: +This project uses `make` for ease of use. You can install the project in development mode, +and install the test requirements, using the `install` target: ```bash -pip install -e . +make install ``` -Then install the test requirements: +It's recommended to create and activate a virtual environment first. There are a number of ways +to do that; I like [virtualenvwrapper](https://virtualenvwrapper.readthedocs.io/en/latest/). -```bash -pip install -r requirements_test.txt -``` - -and finally run `./build.sh` to run the full build. +Use `make` to run the full build. [usage-1]: readme-resources/usage-1.png "Usage example" diff --git a/build.sh b/build.sh deleted file mode 100755 index 7f98c3b..0000000 --- a/build.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash - -cd $(dirname "$0") - -# Run tests, coverage, and flake8 - -RED=$(tput setaf 1) -GREEN=$(tput setaf 2) -RESET=$(tput sgr0) - -_flake8() { - local exit_code=0 - - flake8 s3_browser setup.py - exit_code="$?" - - if [[ "$exit_code" == 0 ]]; then - echo "${GREEN}Flake8 check passed!$RESET" - fi - - return "$exit_code" -} - -mkdir -p build - -echo '' - -pytest --cov=s3_browser || exit 1 - -echo -n "$RED" -_flake8 | tee build/flake8.log || exit 2 -echo -n "$RESET" diff --git a/requirements_test.txt b/requirements_test.txt index 56718aa..bfc5573 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,4 +1,6 @@ -flake8>=3.7.0 +black==24.2.0 +flake8==7.0.0 +isort==5.13.2 coverage>=4.5.0 pytest>=7.2.0 pytest-cov>=4.0.0 diff --git a/s3_browser/argparse.py b/s3_browser/argparse.py index 2466526..a3e082d 100644 --- a/s3_browser/argparse.py +++ b/s3_browser/argparse.py @@ -10,6 +10,7 @@ class ArgumentParser(argparse.ArgumentParser): When we would normally exit safely, such as with --help, we'll add an extra flag to the parser so that the caller can determine it shouldn't proceed. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.exited = False diff --git a/s3_browser/bookmarks.py b/s3_browser/bookmarks.py index d5e9a12..9c9955d 100644 --- a/s3_browser/bookmarks.py +++ b/s3_browser/bookmarks.py @@ -8,7 +8,7 @@ class BookmarkManager(object): - KEY_REGEX = re.compile('^[a-zA-Z0-9_][a-zA-Z0-9_-]{0,15}$') + KEY_REGEX = re.compile("^[a-zA-Z0-9_][a-zA-Z0-9_-]{0,15}$") def __init__(self, bookmark_file): self.bookmark_file = bookmark_file @@ -30,10 +30,7 @@ def add_bookmark(self, name, path): if bookmarks is None: return False - bookmarks[name] = Bookmark( - path=str(path), - created_on=datetime.datetime.now() - ) + bookmarks[name] = Bookmark(path=str(path), created_on=datetime.datetime.now()) self._bookmarks = bookmarks self.save() @@ -57,7 +54,7 @@ def validate_key(cls, k): return bool(cls.KEY_REGEX.match(k)) def clean_data(self, data): - bookmarks = data.get('bookmarks', {}) + bookmarks = data.get("bookmarks", {}) return {k: Bookmark(**v) for k, v in bookmarks.items()} def load(self): @@ -73,33 +70,31 @@ def load(self): # Don't try to read something we know isn't present; it's not an error # though, we'll try to save an initial copy when we add some bookmarks if not os.path.exists(ff): - logger.debug('No bookmark file %s, setting empty', ff) + logger.debug("No bookmark file %s, setting empty", ff) self._bookmarks = {} return True try: - with open(ff, 'r') as f: + with open(ff, "r") as f: data = self.clean_data(json.load(f)) except IOError: - logger.exception('Error reading bookmark file %s', ff) + logger.exception("Error reading bookmark file %s", ff) except ValueError: - logger.exception('Error reading contents of bookmark file %s', ff) + logger.exception("Error reading contents of bookmark file %s", ff) except AttributeError: - logger.exception('Error with bookmark file format (%s)', ff) + logger.exception("Error with bookmark file format (%s)", ff) else: - logger.debug('Successfully read %d bookmarks', len(data)) + logger.debug("Successfully read %d bookmarks", len(data)) self._bookmarks = data return data is not None def save(self): """Save bookmark data to file""" - data = { - 'bookmarks': {k: v.__dict__ for k, v in self.bookmarks.items()} - } + data = {"bookmarks": {k: v.__dict__ for k, v in self.bookmarks.items()}} data = json.dumps(data) - with open(self.bookmark_file, 'w') as f: + with open(self.bookmark_file, "w") as f: f.write(data) @@ -111,7 +106,7 @@ def __init__(self, path, created_on=None, *args, **kwargs): if isinstance(created_on, str): self.created_on = created_on else: - self.created_on = created_on.strftime('%Y-%m-%dT%H:%M:%SZ') + self.created_on = created_on.strftime("%Y-%m-%dT%H:%M:%SZ") def __str__(self): return self.path diff --git a/s3_browser/cli.py b/s3_browser/cli.py index 706ac9d..f4c8039 100755 --- a/s3_browser/cli.py +++ b/s3_browser/cli.py @@ -8,19 +8,14 @@ import sys import textwrap -from s3_browser import bookmarks -from s3_browser import client -from s3_browser import completion -from s3_browser import paths -from s3_browser import tokeniser -from s3_browser import utils +from s3_browser import bookmarks, client, completion, paths, tokeniser, utils from s3_browser.argparse import ArgumentParser as SafeParser logger = logging.getLogger(__name__) class Cli(object): - DEFAULT_PS1 = 's3://\001\x1b[36m\002{path_short}\001\x1b[0m\002> ' + DEFAULT_PS1 = "s3://\001\x1b[36m\002{path_short}\001\x1b[0m\002> " SPLASH = textwrap.dedent( """ Welcome to the interactive AWS S3 navigator. @@ -33,8 +28,22 @@ class Cli(object): ) RECOGNISED_COMMANDS = [ - 'bookmark', 'cat', 'cd', 'clear', 'exit', 'file', 'get', 'head', - 'help', 'll', 'ls', 'prompt', 'put', 'pwd', 'refresh', 'rm' + "bookmark", + "cat", + "cd", + "clear", + "exit", + "file", + "get", + "head", + "help", + "ll", + "ls", + "prompt", + "put", + "pwd", + "refresh", + "rm", ] def __init__( @@ -43,11 +52,11 @@ def __init__( working_dir=None, ps1=None, history_file=None, - bookmark_file=None + bookmark_file=None, ): self.history_file = history_file self.ps1 = ps1 or Cli.DEFAULT_PS1 - self.current_path = paths.S3Path.from_path(working_dir or '/') + self.current_path = paths.S3Path.from_path(working_dir or "/") self.client = client.S3Client(endpoint=endpoint) @@ -62,48 +71,50 @@ def __init__( @staticmethod def _err(msg): """Print a message in red""" - print('\x1b[31m{}\x1b[0m'.format(msg), file=sys.stderr) + print("\x1b[31m{}\x1b[0m".format(msg), file=sys.stderr) def normalise_path(self, path): # Render variables present in the path context = ( - {} if not self.bookmarks else - {k: v.path for k, v in self.bookmarks.bookmarks.items()} + {} + if not self.bookmarks + else {k: v.path for k, v in self.bookmarks.bookmarks.items()} ) path = tokeniser.render(tokeniser.tokenise(path), context) # Strip off the protocol prefix if provided - if path.startswith('s3://'): + if path.startswith("s3://"): path = path[5:] # Special case: ~ refers to the root of the current bucket - if path == '~' or path == '~/': + if path == "~" or path == "~/": return paths.S3Path(bucket=self.current_path.bucket, path=None) - path = os.path.join('/' + str(self.current_path), path) + path = os.path.join("/" + str(self.current_path), path) return paths.S3Path.from_path(path) - def cd(self, path=''): + def cd(self, path=""): full_path = self.normalise_path(path) if self.client.is_path(full_path): self.current_path = full_path return True - self._err('cannot access \'{}\': no such s3 directory'.format(path)) + self._err("cannot access '{}': no such s3 directory".format(path)) return False def ls(self, *args): - parser = SafeParser('ls') + parser = SafeParser("ls") parser.add_argument( - '-l', dest='full_details', action='store_true', - help='Use a long list format, including additional s3 metadata' + "-l", + dest="full_details", + action="store_true", + help="Use a long list format, including additional s3 metadata", ) parser.add_argument( - '-1', dest='oneline', action='store_true', - help='List one result per line' + "-1", dest="oneline", action="store_true", help="List one result per line" ) - parser.add_argument('path', default='', nargs='?') + parser.add_argument("path", default="", nargs="?") args = parser.parse_args(args) if parser.exited: @@ -120,10 +131,7 @@ def ls(self, *args): b = bookmarked.get(str(self.normalise_path(r.path_string))) r.bookmark = b - results = [ - str(r) if not args.full_details else r.full_details - for r in results - ] + results = [str(r) if not args.full_details else r.full_details for r in results] if args.oneline: for r in results: @@ -132,8 +140,8 @@ def ls(self, *args): utils.print_grid(results) def cat(self, *args): - parser = SafeParser('cat') - parser.add_argument('keys', nargs='+', help='S3 key(s) to concatenate') + parser = SafeParser("cat") + parser.add_argument("keys", nargs="+", help="S3 key(s) to concatenate") args = parser.parse_args(args) if parser.exited: @@ -146,8 +154,8 @@ def cat(self, *args): utils.print_object(obj) def rm(self, *args): - parser = SafeParser('rm') - parser.add_argument('keys', nargs='+', help='S3 key(s) to delete') + parser = SafeParser("rm") + parser.add_argument("keys", nargs="+", help="S3 key(s) to delete") args = parser.parse_args(args) if parser.exited: @@ -159,13 +167,9 @@ def rm(self, *args): self.client.rm(p) def put(self, *args): - parser = SafeParser('put') - parser.add_argument( - 'local_file', help='Local file to upload to S3' - ) - parser.add_argument( - 's3_key', nargs=1, help='S3 key at which to write the file' - ) + parser = SafeParser("put") + parser.add_argument("local_file", help="Local file to upload to S3") + parser.add_argument("s3_key", nargs=1, help="S3 key at which to write the file") args = parser.parse_args(args) if parser.exited: @@ -174,11 +178,9 @@ def put(self, *args): self.client.put(args.local_file, self.normalise_path(args.s3_key)) def get(self, *args): - parser = SafeParser('get') - parser.add_argument('s3_key', nargs=1, help='S3 key to download') - parser.add_argument( - 'local_path', help='Local destination for downloaded file' - ) + parser = SafeParser("get") + parser.add_argument("s3_key", nargs=1, help="S3 key to download") + parser.add_argument("local_path", help="Local destination for downloaded file") args = parser.parse_args(args) if parser.exited: @@ -188,69 +190,64 @@ def get(self, *args): local_file = args.local_path if os.path.isdir(args.local_path): - local_file = os.path.join( - args.local_path, - os.path.basename(s3_key.path) - ) + local_file = os.path.join(args.local_path, os.path.basename(s3_key.path)) self.client.get(s3_key, local_file) def add_bookmark(self, name, path): if not bookmarks.BookmarkManager.validate_key(name): - self._err('{} is an invalid name for a bookmark'.format(name)) + self._err("{} is an invalid name for a bookmark".format(name)) return path = self.normalise_path(path) if not self.client.is_path(path): - self._err( - 'cannot bookmark \'{}\': not an s3 directory'.format(path) - ) + self._err("cannot bookmark '{}': not an s3 directory".format(path)) return if not self.bookmarks.add_bookmark(name, path): - self._err('Failed to add bookmark') + self._err("Failed to add bookmark") return def remove_bookmark(self, name): if not self.bookmarks.remove_bookmark(name): - self._err('{} is not the name of a bookmark'.format(name)) + self._err("{} is not the name of a bookmark".format(name)) return False return True def list_bookmarks(self): for k, v in self.bookmarks.bookmarks.items(): - print('\x1b[33m${: <18}\x1b[0m {}'.format(k, str(v))) + print("\x1b[33m${: <18}\x1b[0m {}".format(k, str(v))) def bookmark_help(self): - print(textwrap.dedent( - """ + print( + textwrap.dedent( + """ Add, remove, or list bookmarks. add NAME PATH Add a bookmark called NAME pointing at PATH rm NAME Remove the named bookmark list, ls List all bookmarks """ - )) + ) + ) def bookmark(self, op, *args): if not self.bookmarks: - self._err('Bookmarks are unavailable') + self._err("Bookmarks are unavailable") return f = { - 'add': self.add_bookmark, - 'ls': self.list_bookmarks, - 'list': self.list_bookmarks, - 'help': self.bookmark_help, - 'rm': self.remove_bookmark + "add": self.add_bookmark, + "ls": self.list_bookmarks, + "list": self.list_bookmarks, + "help": self.bookmark_help, + "rm": self.remove_bookmark, }.get(op) if not f: - self._err( - 'Bad operation \'{}\'. Try help for correct usage'.format(op) - ) + self._err("Bad operation '{}'. Try help for correct usage".format(op)) return return f(*args) @@ -264,7 +261,7 @@ def print_head_data(self, key): data = self.client.head(key) data = utils.strip_s3_metadata(data) - print('\x1b[33m{}\x1b[0m'.format(key.canonical)) + print("\x1b[33m{}\x1b[0m".format(key.canonical)) print() utils.print_dict(data) @@ -272,14 +269,13 @@ def _render_prompt(self): return self.ps1.format( path=self.current_path, path_short=self.current_path.short_format, - path_end=( - self.current_path.name or self.current_path.bucket or '/' - ) + path_end=(self.current_path.name or self.current_path.bucket or "/"), ) def help(self): - print(textwrap.dedent( - """ + print( + textwrap.dedent( + """ Available commands: help Print this help message @@ -308,13 +304,14 @@ def help(self): Command history is available (stored in ~/.s3_browser_history) """ - )) + ) + ) def override_prompt(self, *args): if not args: self.ps1 = self.DEFAULT_PS1 else: - self.ps1 = ' '.join(args) + ' ' + self.ps1 = " ".join(args) + " " def exit(self): if self.history_file: @@ -324,7 +321,7 @@ def exit(self): def clear_cache(self): size = self.client.clear_cache() - print('Cleared {} cached paths.'.format(size)) + print("Cleared {} cached paths.".format(size)) def prompt(self): cmd = shlex.split(input(self._render_prompt())) @@ -332,36 +329,36 @@ def prompt(self): return def _ll(*args): - return self.ls('-1', *args) + return self.ls("-1", *args) func = { - 'bookmark': self.bookmark, - 'cat': self.cat, - 'cd': self.cd, - 'clear': lambda: os.system('clear'), - 'exit': self.exit, - 'file': self.print_head_data, - 'get': self.get, - 'head': self.print_head_data, - 'help': self.help, - 'll': _ll, - 'ls': self.ls, - 'prompt': self.override_prompt, - 'put': self.put, - 'pwd': lambda: print(self.current_path.canonical), - 'refresh': self.clear_cache, - 'rm': self.rm + "bookmark": self.bookmark, + "cat": self.cat, + "cd": self.cd, + "clear": lambda: os.system("clear"), + "exit": self.exit, + "file": self.print_head_data, + "get": self.get, + "head": self.print_head_data, + "help": self.help, + "ll": _ll, + "ls": self.ls, + "prompt": self.override_prompt, + "put": self.put, + "pwd": lambda: print(self.current_path.canonical), + "refresh": self.clear_cache, + "rm": self.rm, }.get(cmd[0]) if not func: - self._err('Unrecognised command: \'{}\''.format(cmd[0])) + self._err("Unrecognised command: '{}'".format(cmd[0])) return try: func(*cmd[1:]) except TypeError as e: self._err(str(e)) - logger.exception('Error while running command %s', cmd) + logger.exception("Error while running command %s", cmd) def read_loop(self): """The main start up + main loop of the cli""" @@ -374,60 +371,72 @@ def read_loop(self): try: self.prompt() except KeyboardInterrupt: - print('') + print("") except Exception as e: self._err(str(e)) - logger.exception('Unexpected error') + logger.exception("Unexpected error") def configure_debug_logging(): logging.basicConfig( - filename='/tmp/s3_browser.log', - format='%(asctime)s %(levelname)s %(module)s:%(funcName)s %(message)s', - level=logging.INFO + filename="/tmp/s3_browser.log", + format="%(asctime)s %(levelname)s %(module)s:%(funcName)s %(message)s", + level=logging.INFO, ) - logging.getLogger('s3_browser').setLevel(logging.DEBUG) + logging.getLogger("s3_browser").setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG) def main(): - parser = argparse.ArgumentParser('s3-browser') + parser = argparse.ArgumentParser("s3-browser") parser.add_argument( - '-p', '--prompt', dest='prompt', type=str, default=None, + "-p", + "--prompt", + dest="prompt", + type=str, + default=None, help=( - 'Prompt string to use; use the special patterns {path}, ' - '{path_short}, or {path_end} for displaying the current path' - ) + "Prompt string to use; use the special patterns {path}, " + "{path_short}, or {path_end} for displaying the current path" + ), ) parser.add_argument( - '-e', '--endpoint', type=str, default=None, + "-e", + "--endpoint", + type=str, + default=None, help=( - 'Optional endpoint URL to use if not the default Amazon S3 URL. ' - 'Hoststring like https://example.com:1234' - ) + "Optional endpoint URL to use if not the default Amazon S3 URL. " + "Hoststring like https://example.com:1234" + ), ) parser.add_argument( - '--bookmarks', dest='bookmark_file', type=str, - default='{}/.s3_browser_bookmarks'.format( - os.environ.get('HOME', '/etc') - ) + "--bookmarks", + dest="bookmark_file", + type=str, + default="{}/.s3_browser_bookmarks".format(os.environ.get("HOME", "/etc")), ) parser.add_argument( - '--history', dest='history_file', type=str, - default='{}/.s3_browser_history'.format(os.environ.get('HOME', '/etc')) + "--history", + dest="history_file", + type=str, + default="{}/.s3_browser_history".format(os.environ.get("HOME", "/etc")), ) parser.add_argument( - '--debug', dest='debug', action='store_true', default=False, - help='Turn on debug mode, logging information to /tmp/s3_browser.log' + "--debug", + dest="debug", + action="store_true", + default=False, + help="Turn on debug mode, logging information to /tmp/s3_browser.log", ) - parser.add_argument('working_dir', nargs='?', type=str, default='/') + parser.add_argument("working_dir", nargs="?", type=str, default="/") args = parser.parse_args() if args.debug: configure_debug_logging() - logger.info('Starting s3 browser in debug mode') + logger.info("Starting s3 browser in debug mode") else: logging.disable(logging.CRITICAL) @@ -436,9 +445,9 @@ def main(): working_dir=args.working_dir, ps1=args.prompt, history_file=args.history_file, - bookmark_file=args.bookmark_file + bookmark_file=args.bookmark_file, ).read_loop() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/s3_browser/client.py b/s3_browser/client.py index 4a55bd0..27f29a1 100644 --- a/s3_browser/client.py +++ b/s3_browser/client.py @@ -1,7 +1,7 @@ -import boto3 import logging import os +import boto3 import magic from s3_browser import paths @@ -15,8 +15,9 @@ class S3Client(object): the boto s3 client and adding memoisation and a more concise and path-like API """ + def __init__(self, endpoint=None): - self.boto = boto3.client('s3', endpoint_url=endpoint) + self.boto = boto3.client("s3", endpoint_url=endpoint) self.path_cache = {} self.mime_typer = magic.Magic(mime=True) @@ -55,8 +56,8 @@ def invalidate_cache(self, path): next_path.path = next - logger.debug('Clearing cache keys: %s', cache_keys) - logger.debug('Cache keys present: %s', self.path_cache.keys()) + logger.debug("Clearing cache keys: %s", cache_keys) + logger.debug("Cache keys present: %s", self.path_cache.keys()) for k in cache_keys: self.path_cache.pop(k, None) @@ -66,60 +67,58 @@ def ls(self, path, path_fragment=False): :type path: s3_browser.paths.S3Path """ - logger.debug('ls called: %s, %s', path, path_fragment) + logger.debug("ls called: %s, %s", path, path_fragment) cache_key = (path.canonical, path_fragment) cached = self.path_cache.get(cache_key) if cached is not None: - logger.debug('cache hit') + logger.debug("cache hit") return cached - logger.debug('cache miss') + logger.debug("cache miss") def _fetch(): if not path.bucket or not path.path and path_fragment: - logger.debug('Listing buckets') + logger.debug("Listing buckets") res = [ - paths.S3Bucket(b['Name']) - for b in self.boto.list_buckets().get('Buckets', []) + paths.S3Bucket(b["Name"]) + for b in self.boto.list_buckets().get("Buckets", []) ] if path.bucket: logger.debug('Trimming bucket list: "%s"', path.bucket) res = [r for r in res if r.bucket.startswith(path.bucket)] - logger.debug('Found buckets: %s', [str(r) for r in res]) + logger.debug("Found buckets: %s", [str(r) for r in res]) return res if not path_fragment: - search_path = path.path + '/' if path.path else '' + search_path = path.path + "/" if path.path else "" else: - search_path = path.path or '' + search_path = path.path or "" - last_slash = search_path.rfind('/') + last_slash = search_path.rfind("/") search_len = last_slash + 1 if last_slash != -1 else 0 logger.debug( - 'Listing objects. full path: "%s", search_path: "%s"', - path, search_path + 'Listing objects. full path: "%s", search_path: "%s"', path, search_path ) # TODO: [ab]use pagination (see boto/boto3#134) res = self.boto.list_objects( - Bucket=path.bucket, - Prefix=search_path, - Delimiter='/' + Bucket=path.bucket, Prefix=search_path, Delimiter="/" ) prefixes = [ - paths.S3Prefix(r['Prefix'][search_len:]) - for r in res.get('CommonPrefixes', []) + paths.S3Prefix(r["Prefix"][search_len:]) + for r in res.get("CommonPrefixes", []) ] keys = [ - paths.S3Key(r['Key'][search_len:], r['LastModified']) - for r in res.get('Contents', []) - if r['Key'] != search_path + paths.S3Key(r["Key"][search_len:], r["LastModified"]) + for r in res.get("Contents", []) + if r["Key"] != search_path ] logger.debug( - 'results: prefixes: %s -- keys: %s', - [str(p) for p in prefixes], [str(k) for k in keys] + "results: prefixes: %s -- keys: %s", + [str(p) for p in prefixes], + [str(k) for k in keys], ) return prefixes + keys @@ -137,7 +136,7 @@ def head(self, path): else: res = self.boto.head_object(Bucket=path.bucket, Key=path.path) - logger.debug('Head %s: response = %s', path, res) + logger.debug("Head %s: response = %s", path, res) return res def rm(self, path): @@ -148,27 +147,21 @@ def rm(self, path): def put(self, f, dest): """Write a file to an S3Path""" content_type = self.mime_typer.from_file(f) - logger.debug( - 'Uploading %s to %s with content-type %s', f, dest, content_type - ) + logger.debug("Uploading %s to %s with content-type %s", f, dest, content_type) self.boto.upload_file( Filename=f, Bucket=dest.bucket, Key=dest.path, - ExtraArgs={'ContentType': content_type} + ExtraArgs={"ContentType": content_type}, ) self.invalidate_cache(dest) def get(self, key, dest): """Download a key (S3Path) to a local file""" - logger.debug('Downloading %s to %s', key, dest) - self.boto.download_file( - Bucket=key.bucket, - Key=key.path, - Filename=dest - ) + logger.debug("Downloading %s to %s", key, dest) + self.boto.download_file(Bucket=key.bucket, Key=key.path, Filename=dest) def get_object(self, path): """Get a full object at a path""" diff --git a/s3_browser/completion.py b/s3_browser/completion.py index 843fd83..912edc6 100644 --- a/s3_browser/completion.py +++ b/s3_browser/completion.py @@ -10,8 +10,9 @@ class CliCompleter(object): """ Tab-complete functionality for the cli """ - EXPECTS_KEY = {'cat', 'file', 'head', 'rm'} - EXPECTS_S3_PATH = {'cd', 'ls', 'll'}.union(EXPECTS_KEY) + + EXPECTS_KEY = {"cat", "file", "head", "rm"} + EXPECTS_S3_PATH = {"cd", "ls", "ll"}.union(EXPECTS_KEY) def __init__(self, cli): self.cli = cli @@ -26,7 +27,7 @@ def _split_cmd(self, buf): """ # Single empty command to complete, if we haven't started typing at all if not buf: - return [''] + return [""] # FIXME: Can we do this analysis more intelligently using shlex.shlex # and scanning / tokenising directly? @@ -37,15 +38,15 @@ def _split_cmd(self, buf): # Deal with the case where we end with a space, and will want to # complete the next argument with an empty string input - if buf.endswith(' '): - result.append('') + if buf.endswith(" "): + result.append("") return result except ValueError as e: logger.debug( - 'Error while splitting with shlex: %s, trying with ending ' - 'double-quote', - e + "Error while splitting with shlex: %s, trying with ending " + "double-quote", + e, ) # Try with an ending double quote to complete an unclosed double-quoted @@ -58,15 +59,15 @@ def _split_cmd(self, buf): return shlex.split(buf + '"') except ValueError as e: logger.debug( - 'Still failed splitting with shlex: %s, trying with ending ' - 'single-quote', - e + "Still failed splitting with shlex: %s, trying with ending " + "single-quote", + e, ) try: - return shlex.split(buf + '\'') + return shlex.split(buf + "'") except ValueError as e: - logger.error('Failed last attempt at splitting with shlex: %s', e) + logger.error("Failed last attempt at splitting with shlex: %s", e) raise def complete_command(self, cmd, state): @@ -74,9 +75,7 @@ def complete_command(self, cmd, state): Complete a command if we're just starting to write a command (i.e. no spaces in the command yet) """ - matches = [ - c for c in self.cli.RECOGNISED_COMMANDS if c.startswith(cmd) - ] + matches = [c for c in self.cli.RECOGNISED_COMMANDS if c.startswith(cmd)] if state < len(matches): return matches[state] @@ -100,8 +99,8 @@ def complete_s3_path(self, partial, state, allow_keys=False): """ # ~ is a special case referring to the root of the current bucket, # so just add a forward slash to continue the path from that root - if partial == '~': - return '~/' if state == 0 else None + if partial == "~": + return "~/" if state == 0 else None special_results = [] search_term = None @@ -112,17 +111,17 @@ def complete_s3_path(self, partial, state, allow_keys=False): # relative meanings of those terms. In which case, we need to look for # files with that prefix in the directory above them, rather than # following the relative paths, as well as suggesting ./ or ../ - if basename in {'.', '..'}: - special_results.append(basename + '/') + if basename in {".", ".."}: + special_results.append(basename + "/") search_term = self.cli.normalise_path(os.path.dirname(partial)) search_term.path = os.path.join(search_term.path, basename) else: search_term = self.cli.normalise_path(partial) hits = [ - shlex.quote(str(r)) for r in self.s3_client.ls( - search_term, - path_fragment=not partial.endswith('/') + shlex.quote(str(r)) + for r in self.s3_client.ls( + search_term, path_fragment=not partial.endswith("/") ) if allow_keys or not r.is_key ] @@ -135,23 +134,18 @@ def complete_local_path(self, partial, state): Autocomplete for an expected local filesystem path """ if os.path.isfile(partial): - return ( - shlex.quote(os.path.basename(partial)) if state == 0 else None - ) + return shlex.quote(os.path.basename(partial)) if state == 0 else None hits = [] - if partial.endswith('/') and os.path.isdir(partial): + if partial.endswith("/") and os.path.isdir(partial): hits = os.listdir(partial) else: parent = os.path.dirname(partial) frag = os.path.basename(partial) if not parent or os.path.isdir(parent): - hits = [ - h for h in os.listdir(parent or '.') - if h.startswith(frag) - ] + hits = [h for h in os.listdir(parent or ".") if h.startswith(frag)] return shlex.quote(hits[state]) if state < len(hits) else None @@ -162,7 +156,7 @@ def complete_put_get(self, words, state, s3_first): one we should be completing by the current argument count, ignoring any flags. """ - args = [w for w in words[1:] if not w.startswith('-')] + args = [w for w in words[1:] if not w.startswith("-")] arg_count = len(args) if s3_first and arg_count == 1 or not s3_first and arg_count == 2: @@ -190,22 +184,18 @@ def complete(self, text, state): if len(words) == 1: return self.complete_command(cmd, state) - if cmd == 'put': + if cmd == "put": return self.complete_put_get(words, state, s3_first=False) - if cmd == 'get': + if cmd == "get": return self.complete_put_get(words, state, s3_first=True) if cmd in self.EXPECTS_S3_PATH: - return self.complete_s3_path( - words[-1], - state, - cmd in self.EXPECTS_KEY - ) + return self.complete_s3_path(words[-1], state, cmd in self.EXPECTS_KEY) return None def bind(self): - readline.set_completer_delims(' \t\n/;') - readline.parse_and_bind('tab: complete') + readline.set_completer_delims(" \t\n/;") + readline.parse_and_bind("tab: complete") readline.set_completer(self.complete) diff --git a/s3_browser/paths.py b/s3_browser/paths.py index cc416a3..0514ce5 100644 --- a/s3_browser/paths.py +++ b/s3_browser/paths.py @@ -8,7 +8,7 @@ def _annotate_bookmark(label, bookmark=None): if not bookmark: return label - return '\x1b[33m${}\x1b[0m {}'.format(bookmark, label) + return "\x1b[33m${}\x1b[0m {}".format(bookmark, label) class S3Path(object): @@ -17,50 +17,51 @@ class S3Path(object): Intended to be used to track an s3 location being visited or checked. """ + def __init__(self, bucket, path): self.bucket = bucket - self.path = os.path.realpath('/{}'.format(path))[1:] if path else None - self.name = self.path.split('/')[-1] or None if self.path else None + self.path = os.path.realpath("/{}".format(path))[1:] if path else None + self.name = self.path.split("/")[-1] or None if self.path else None @staticmethod def from_path(path): stripped = path - if stripped.startswith('s3://'): + if stripped.startswith("s3://"): stripped = path[5:] - stripped = stripped.strip('/') + stripped = stripped.strip("/") if not stripped: return S3Path(None, None) - comp = stripped.split('/') - return S3Path(comp[0], '/'.join(comp[1:])) + comp = stripped.split("/") + return S3Path(comp[0], "/".join(comp[1:])) @property def short_format(self): if not self.bucket: - return '/' + return "/" - if self.path and '/' in self.path: - return '{}/…/{}'.format(self.bucket, self.name) + if self.path and "/" in self.path: + return "{}/…/{}".format(self.bucket, self.name) - return '{}/{}'.format(self.bucket, self.path or '') + return "{}/{}".format(self.bucket, self.path or "") @property def canonical(self): """Full path as accepted by the cli, with s3:// protocol specified""" if not self.bucket: - return 's3://' + return "s3://" - return 's3://{}/{}'.format(self.bucket, self.path or '') + return "s3://{}/{}".format(self.bucket, self.path or "") def __eq__(self, other): return self.canonical == other.canonical def __str__(self): if not self.bucket: - return '/' + return "/" - return '/{}/{}'.format(self.bucket, self.path or '') + return "/{}/{}".format(self.bucket, self.path or "") class S3Bucket(object): @@ -69,6 +70,7 @@ class S3Bucket(object): Primarily just to match the S3Prefix and S3Key API """ + def __init__(self, bucket): self.bucket = bucket self.bookmark = None @@ -81,15 +83,15 @@ def full_details(self): Designed to line up with S3Key's implementation of the same method """ - label = _annotate_bookmark('BUCKET', self.bookmark) - return '{: >19} {}'.format(label, self.bucket) + label = _annotate_bookmark("BUCKET", self.bookmark) + return "{: >19} {}".format(label, self.bucket) @property def path_string(self): """ Prefix the bucket value with / to indicate it's absolute (top-level) """ - return '/' + self.bucket + return "/" + self.bucket def __str__(self): return self.bucket @@ -103,6 +105,7 @@ class S3Prefix(object): absolute prefix to the destination; it is a wrapper around a prefix result and is only useful in the context of a particular query """ + def __init__(self, prefix): self.prefix = prefix self.bookmark = None @@ -115,8 +118,8 @@ def full_details(self): Designed to line up with S3Key's implementation of the same method """ - label = _annotate_bookmark('PREFIX', self.bookmark) - return '{: >19} {}'.format(label, self.prefix) + label = _annotate_bookmark("PREFIX", self.bookmark) + return "{: >19} {}".format(label, self.prefix) @property def path_string(self): @@ -137,18 +140,18 @@ class S3Key(object): it is a wrapper around a key result and is only useful in the context of a particular query """ + def __init__(self, key, updated_on=None): self.key = key self.updated_on = ( - updated_on.strftime('%Y-%m-%d %H:%M:%S') if updated_on else None + updated_on.strftime("%Y-%m-%d %H:%M:%S") if updated_on else None ) self.is_key = True @property def full_details(self): - return '{updated_on: >19} {key}'.format( - updated_on=self.updated_on or '', - key=self.key + return "{updated_on: >19} {key}".format( + updated_on=self.updated_on or "", key=self.key ) @property diff --git a/s3_browser/tests/test_bookmarks.py b/s3_browser/tests/test_bookmarks.py index a84453a..1b15f8d 100644 --- a/s3_browser/tests/test_bookmarks.py +++ b/s3_browser/tests/test_bookmarks.py @@ -8,14 +8,14 @@ class TestBookmarks: - FILE_PREFIX = 's3_browser_tests_' + FILE_PREFIX = "s3_browser_tests_" data = { - 'bookmarks': { - 'foo': { - 'path': '/test-bucket/bar/baz', - 'created_on': '2019-01-01 00:00:00', - 'bogus_extra_data': 'hodor' + "bookmarks": { + "foo": { + "path": "/test-bucket/bar/baz", + "created_on": "2019-01-01 00:00:00", + "bogus_extra_data": "hodor", } } } @@ -24,33 +24,30 @@ class TestBookmarks: def tear_down(self): """Clean up temporary bookmark files""" yield - for f in os.listdir('/tmp'): + for f in os.listdir("/tmp"): if not f.startswith(self.FILE_PREFIX): continue - os.remove(os.path.join('/tmp', f)) + os.remove(os.path.join("/tmp", f)) @classmethod def gen_filename(cls): - return '/tmp/{}{}.json'.format(cls.FILE_PREFIX, uuid.uuid4()) + return "/tmp/{}{}.json".format(cls.FILE_PREFIX, uuid.uuid4()) @property def expected_bookmarks(self): - return self.normalise_bookmarks({ - k: bookmarks.Bookmark(**v) - for k, v in self.data['bookmarks'].items() - }) + return self.normalise_bookmarks( + {k: bookmarks.Bookmark(**v) for k, v in self.data["bookmarks"].items()} + ) def normalise_bookmarks(self, data): """Normalise bookmark data as dicts for easy comparison""" - return { - k: v.__dict__ for k, v in data.items() - } + return {k: v.__dict__ for k, v in data.items()} def write_fixture(self, f, data=None): data = data or self.data - with open(f, 'w') as ff: + with open(f, "w") as ff: json.dump(data, ff) def test_read_bookmarks_file(self): @@ -66,7 +63,7 @@ def test_clean_bookmark_data(self): """Should ignore unexpected fields in the bookmark file""" f = self.gen_filename() data = self.data.copy() - data['bookmarks']['foo']['hodor'] = 'foo' + data["bookmarks"]["foo"]["hodor"] = "foo" self.write_fixture(f) manager = bookmarks.BookmarkManager(f) @@ -82,16 +79,17 @@ def test_missing_bookmark_file(self): def test_add_bookmarks(self): f = self.gen_filename() manager = bookmarks.BookmarkManager(f) - manager.add_bookmark('foo', '/hodor/hodor/hodor') - manager.add_bookmark('bar', '/hodor/hodor') - manager.add_bookmark('baz', '/hodor') + manager.add_bookmark("foo", "/hodor/hodor/hodor") + manager.add_bookmark("bar", "/hodor/hodor") + manager.add_bookmark("baz", "/hodor") actual = manager.bookmarks - assert actual.keys() == {'foo', 'bar', 'baz'} - assert ( - {v.path for v in actual.values()} == - {'/hodor', '/hodor/hodor', '/hodor/hodor/hodor'} - ) + assert actual.keys() == {"foo", "bar", "baz"} + assert {v.path for v in actual.values()} == { + "/hodor", + "/hodor/hodor", + "/hodor/hodor/hodor", + } for v in actual.values(): assert v.created_on is not None @@ -104,7 +102,7 @@ def test_remove_bookmarks(self): actual = self.normalise_bookmarks(manager.bookmarks) assert actual == self.expected_bookmarks - for b in self.data['bookmarks'].keys(): + for b in self.data["bookmarks"].keys(): manager.remove_bookmark(b) is True actual = self.normalise_bookmarks(manager.bookmarks) @@ -113,8 +111,8 @@ def test_remove_bookmarks(self): def test_remove_missing_bookmark(self): f = self.gen_filename() man = bookmarks.BookmarkManager(f) - man.add_bookmark('awesome_bookmark', 'amazing/path') - assert man.remove_bookmark('lame_bookmark') is False + man.add_bookmark("awesome_bookmark", "amazing/path") + assert man.remove_bookmark("lame_bookmark") is False def test_save_bookmarks(self): f = self.gen_filename() @@ -125,10 +123,10 @@ def test_save_bookmarks(self): man2.load() # Bookmarks are written to disk eagerly as they're added / removed - man1.add_bookmark('mighty_bookmark', '/valley/of/strength') - man1.add_bookmark('feeble_bookmark', '/plain/of/wimpiness') - man1.add_bookmark('average_bookmark', '/hill/of/normality') - man1.remove_bookmark('average_bookmark') + man1.add_bookmark("mighty_bookmark", "/valley/of/strength") + man1.add_bookmark("feeble_bookmark", "/plain/of/wimpiness") + man1.add_bookmark("average_bookmark", "/hill/of/normality") + man1.remove_bookmark("average_bookmark") expected = self.normalise_bookmarks(man1.bookmarks) # Check the second instance is indeed empty @@ -140,8 +138,8 @@ def test_save_bookmarks(self): def test_validate_bookmark_key(self): """Key names should be checked against a pattern""" - valid_names = ['hodor', 'ostrich', 'potato123', 'dashy-key'] - invalid_names = ['thisnameisabittoolong', 'funny/characters', '-flag'] + valid_names = ["hodor", "ostrich", "potato123", "dashy-key"] + invalid_names = ["thisnameisabittoolong", "funny/characters", "-flag"] for n in valid_names: assert bookmarks.BookmarkManager.validate_key(n) is True @@ -157,23 +155,23 @@ def test_unreadable_bookmark_file(self): man = bookmarks.BookmarkManager(f) assert man.bookmarks is None - assert man.add_bookmark('nope', 'nope/nope/nope') is False + assert man.add_bookmark("nope", "nope/nope/nope") is False def test_malformed_bookmark_file(self): """If the JSON is malformed, refuse to support bookmarks""" f = self.gen_filename() - with open(f, 'w') as ff: - ff.write('bad json') + with open(f, "w") as ff: + ff.write("bad json") man = bookmarks.BookmarkManager(f) assert man.bookmarks is None - assert man.add_bookmark('nope', 'nope/nope/nope') is False + assert man.add_bookmark("nope", "nope/nope/nope") is False def test_bad_bookmark_file_data(self): """If the JSON has a bad structure, refuse to support bookmarks""" f = self.gen_filename() - self.write_fixture(f, {'bookmarks': 'should be an object!'}) + self.write_fixture(f, {"bookmarks": "should be an object!"}) man = bookmarks.BookmarkManager(f) assert man.bookmarks is None - assert man.add_bookmark('nope', 'nope/nope/nope') is False + assert man.add_bookmark("nope", "nope/nope/nope") is False diff --git a/s3_browser/tests/test_completion.py b/s3_browser/tests/test_completion.py index 7f29ff8..23ea796 100644 --- a/s3_browser/tests/test_completion.py +++ b/s3_browser/tests/test_completion.py @@ -1,14 +1,10 @@ import os import shlex -from unittest.mock import MagicMock -from unittest.mock import patch - -import pytest +from unittest.mock import MagicMock, patch from s3_browser.cli import Cli from s3_browser.completion import CliCompleter -from s3_browser.paths import S3Key -from s3_browser.paths import S3Prefix +from s3_browser.paths import S3Key, S3Prefix class TestCompletion: @@ -28,87 +24,85 @@ def _complete(self, completer, get_line_buffer, text, state): get_line_buffer.reset_mock() return res - @patch('readline.get_line_buffer') + @patch("readline.get_line_buffer") def test_complete_empty_command(self, mock): """Tab on an empty string should list all commands""" completer = self._completer() for i, cmd in enumerate(Cli.RECOGNISED_COMMANDS): - assert self._complete(completer, mock, '', i) == cmd + assert self._complete(completer, mock, "", i) == cmd - @patch('readline.get_line_buffer') + @patch("readline.get_line_buffer") def test_complete_partial_command(self, mock): completer = self._completer() - assert self._complete(completer, mock, 'c', 0) == 'cat' - assert self._complete(completer, mock, 'c', 1) == 'cd' - assert self._complete(completer, mock, 'c', 2) == 'clear' - assert self._complete(completer, mock, 'bo', 0) == 'bookmark' + assert self._complete(completer, mock, "c", 0) == "cat" + assert self._complete(completer, mock, "c", 1) == "cd" + assert self._complete(completer, mock, "c", 2) == "clear" + assert self._complete(completer, mock, "bo", 0) == "bookmark" - @patch('readline.get_line_buffer') + @patch("readline.get_line_buffer") def test_complete_s3_path_commands(self, mock): """Tab on several commands should complete S3 paths or keys""" completer = self._completer() - prefixes = [S3Prefix('ash'), S3Prefix('mia')] - files = [S3Key('tric.txt')] + prefixes = [S3Prefix("ash"), S3Prefix("mia")] + files = [S3Key("tric.txt")] expected_paths = [str(p) for p in prefixes] + [None] - expected_files = ( - [str(p) for p in prefixes] + [str(f) for f in files] + [None] - ) + expected_files = [str(p) for p in prefixes] + [str(f) for f in files] + [None] completer.cli.client.ls.return_value = prefixes + files for i, p in enumerate(expected_paths): - assert self._complete(completer, mock, 'cd ', i) == p - assert self._complete(completer, mock, 'ls ', i) == p - assert self._complete(completer, mock, 'll ', i) == p + assert self._complete(completer, mock, "cd ", i) == p + assert self._complete(completer, mock, "ls ", i) == p + assert self._complete(completer, mock, "ll ", i) == p for i, f in enumerate(expected_files): - assert self._complete(completer, mock, 'cat ', i) == f - assert self._complete(completer, mock, 'cat ./ ', i) == f - assert self._complete(completer, mock, 'file ', i) == f - assert self._complete(completer, mock, 'get ', i) == f - assert self._complete(completer, mock, 'head ', i) == f - assert self._complete(completer, mock, 'put ./ ', i) == f - assert self._complete(completer, mock, 'rm ', i) == f - assert self._complete(completer, mock, 'rm ./ ', i) == f + assert self._complete(completer, mock, "cat ", i) == f + assert self._complete(completer, mock, "cat ./ ", i) == f + assert self._complete(completer, mock, "file ", i) == f + assert self._complete(completer, mock, "get ", i) == f + assert self._complete(completer, mock, "head ", i) == f + assert self._complete(completer, mock, "put ./ ", i) == f + assert self._complete(completer, mock, "rm ", i) == f + assert self._complete(completer, mock, "rm ./ ", i) == f # . and .. should suggest the relative dirs and also any s3 key hits # Note that it'd be limited to dot-prefixed paths in reality, but our # mock always returns expected_files for a key search in this case - for i, f in enumerate(['./'] + expected_files): - assert self._complete(completer, mock, 'cat .', i) == f + for i, f in enumerate(["./"] + expected_files): + assert self._complete(completer, mock, "cat .", i) == f - for i, f in enumerate(['../'] + expected_files): - assert self._complete(completer, mock, 'cat ..', i) == f + for i, f in enumerate(["../"] + expected_files): + assert self._complete(completer, mock, "cat ..", i) == f - @patch('readline.get_line_buffer') + @patch("readline.get_line_buffer") def test_complete_local_path(self, mock): """Tab on put should complete s3 path or local path arguments""" completer = self._completer() - files = [shlex.quote(f) for f in os.listdir('.')] + files = [shlex.quote(f) for f in os.listdir(".")] for i, f in enumerate(files): - assert self._complete(completer, mock, 'put ', i) == f - assert self._complete(completer, mock, 'get . ', i) == f + assert self._complete(completer, mock, "put ", i) == f + assert self._complete(completer, mock, "get . ", i) == f - @patch('readline.get_line_buffer') + @patch("readline.get_line_buffer") def test_complete_paths_with_quotes(self, mock): """Tab complete should work where paths need quoting""" completer = self._completer() - completer.cli.client.ls.return_value = [S3Key('argh spaces.txt')] + completer.cli.client.ls.return_value = [S3Key("argh spaces.txt")] partials = [ - 'cat ', - 'cat a', - 'cat arg', - 'cat argh', + "cat ", + "cat a", + "cat arg", + "cat argh", 'cat "argh spaces', - 'cat \'argh spaces', + "cat 'argh spaces", 'cat "argh spaces"', - 'cat \'argh spaces\'' + "cat 'argh spaces'", ] - expected = '\'argh spaces.txt\'' + expected = "'argh spaces.txt'" for p in partials: assert self._complete(completer, mock, p, 0) == expected diff --git a/s3_browser/tests/test_paths.py b/s3_browser/tests/test_paths.py index f8e0de0..b5038d6 100644 --- a/s3_browser/tests/test_paths.py +++ b/s3_browser/tests/test_paths.py @@ -1,9 +1,4 @@ -import unittest - -from s3_browser.paths import S3Bucket -from s3_browser.paths import S3Key -from s3_browser.paths import S3Path -from s3_browser.paths import S3Prefix +from s3_browser.paths import S3Bucket, S3Key, S3Path, S3Prefix def _test_s3_object_api(obj): @@ -14,64 +9,70 @@ def _test_s3_object_api(obj): it in a few ways, and it must render without error both with and without a bookmark annotation declared """ + def basic_checks(): assert obj.is_key is not None assert obj.full_details is not None assert obj.path_string is not None basic_checks() - obj.bookmark = 'my_bookmark' + obj.bookmark = "my_bookmark" basic_checks() + def test_s3_path_from_path_string(): """S3Path should be created properly from various path strings""" tests = [ - ('', S3Path(None, None)), - ('/', S3Path(None, None)), - ('a/b/c/d/e/f/g', S3Path('a', 'b/c/d/e/f/g')), - ('/hodor-hodor', S3Path('hodor-hodor', None)), - ('s3://hodor-hodor', S3Path('hodor-hodor', None)), + ("", S3Path(None, None)), + ("/", S3Path(None, None)), + ("a/b/c/d/e/f/g", S3Path("a", "b/c/d/e/f/g")), + ("/hodor-hodor", S3Path("hodor-hodor", None)), + ("s3://hodor-hodor", S3Path("hodor-hodor", None)), ( - 's3://hodorhodor/hodor/hodor/hodor.txt', - S3Path('hodorhodor', 'hodor/hodor/hodor.txt') - ) + "s3://hodorhodor/hodor/hodor/hodor.txt", + S3Path("hodorhodor", "hodor/hodor/hodor.txt"), + ), ] for input, expected in tests: assert S3Path.from_path(input) == expected + def test_s3_path_short_format(): """S3Path should render a concise format for ease of use in prompts""" tests = [ - ('/', '/'), - ('a/b/c/d/e/f/g', 'a/…/g'), + ("/", "/"), + ("a/b/c/d/e/f/g", "a/…/g"), ( - 'something-pretty-long/middle/end-of-long-thing', - 'something-pretty-long/…/end-of-long-thing' # TODO: improve? + "something-pretty-long/middle/end-of-long-thing", + "something-pretty-long/…/end-of-long-thing", # TODO: improve? ), - ('foo/bar', 'foo/bar') + ("foo/bar", "foo/bar"), ] for input, expected in tests: assert S3Path.from_path(input).short_format == expected + def test_s3_bucket_api(): """S3Bucket should support the defined S3 object API""" - bucket = S3Bucket('westeros') + bucket = S3Bucket("westeros") _test_s3_object_api(bucket) assert bucket.is_key is False + def test_s3_prefix_api(): """S3Prefix should support the defined S3 object API""" - prefix = S3Prefix('winterfell/stark') + prefix = S3Prefix("winterfell/stark") _test_s3_object_api(prefix) assert prefix.is_key is False + def test_s3_key_api(): """S3Key should support the defined S3 object API""" - key = S3Key('winterfell/stark/arya.json') + key = S3Key("winterfell/stark/arya.json") _test_s3_object_api(key) assert key.is_key is True diff --git a/s3_browser/tests/test_tokeniser.py b/s3_browser/tests/test_tokeniser.py index c04f50c..2f76131 100644 --- a/s3_browser/tests/test_tokeniser.py +++ b/s3_browser/tests/test_tokeniser.py @@ -1,39 +1,32 @@ import pytest -from s3_browser.tokeniser import RawString as S -from s3_browser.tokeniser import Token as T -from s3_browser.tokeniser import TokeniserException -from s3_browser.tokeniser import render -from s3_browser.tokeniser import tokenise +from s3_browser.tokeniser import ( + RawString as S, + render, + Token as T, + tokenise, + TokeniserException +) def test_tokeniser(): """Test that the tokeniser works for various variable combinations""" tests = ( - ('literal string ok', [S('literal string ok')]), - (' whitespacey ', [S(' whitespacey ')]), - (r'\\\\', [S(r'\\\\')]), + ("literal string ok", [S("literal string ok")]), + (" whitespacey ", [S(" whitespacey ")]), + (r"\\\\", [S(r"\\\\")]), + (r"escaped \$ \$dollar \$signs \$\$", [S("escaped $ $dollar $signs $$")]), + ("$var$variable$foo", [T("var"), T("variable"), T("foo")]), + ("$var $variable $foo", [T("var"), S(" "), T("variable"), S(" "), T("foo")]), ( - r'escaped \$ \$dollar \$signs \$\$', - [S('escaped $ $dollar $signs $$')] + "${brace yourself} winter is coming", + [T("brace yourself"), S(" winter is coming")], ), ( - '$var$variable$foo', - [T('var'), T('variable'), T('foo')] + "${$$$inside all is literal$$$}${}", + [T("$$$inside all is literal$$$"), T("")], ), - ( - '$var $variable $foo', - [T('var'), S(' '), T('variable'), S(' '), T('foo')] - ), - ( - '${brace yourself} winter is coming', - [T('brace yourself'), S(' winter is coming')] - ), - ( - '${$$$inside all is literal$$$}${}', - [T('$$$inside all is literal$$$'), T('')] - ), - ('end on a $', [S('end on a ')]) + ("end on a $", [S("end on a ")]), ) for t in tests: @@ -42,33 +35,29 @@ def test_tokeniser(): assert actual == expected + def test_tokeniser_failures(): """Test that the tokeniser fails for malformed variables""" with pytest.raises(TokeniserException): - tokenise('${wut') + tokenise("${wut") + def test_render(): """Test that tokens can be correctly rendered back into a string""" - context = { - 'foo': 'hodor', - 'bar': 'arya', - 'baz': 'bran' - } + context = {"foo": "hodor", "bar": "arya", "baz": "bran"} tests = ( - ([T('foo'), S(' hodor '), T('foo')], 'hodor hodor hodor'), - ([S('simple string')], 'simple string'), - ([S(r'\\\\\\')], r'\\\\\\'), - ( - [T('bar'), S('\'s brother is '), T('baz')], - 'arya\'s brother is bran' - ) + ([T("foo"), S(" hodor "), T("foo")], "hodor hodor hodor"), + ([S("simple string")], "simple string"), + ([S(r"\\\\\\")], r"\\\\\\"), + ([T("bar"), S("'s brother is "), T("baz")], "arya's brother is bran"), ) for t in tests: assert render(t[0], context) == t[1] + def test_render_unknown_token(): """Test that rendering fails for unknown tokens""" with pytest.raises(TokeniserException): - render([T('foo')], {'bar': 'baz'}) + render([T("foo")], {"bar": "baz"}) diff --git a/s3_browser/tests/test_utils.py b/s3_browser/tests/test_utils.py index d1e870f..75d8b02 100644 --- a/s3_browser/tests/test_utils.py +++ b/s3_browser/tests/test_utils.py @@ -8,15 +8,15 @@ class UtilsTest(unittest.TestCase): def test_pretty_size(self): """Test that the pretty-size util approximates filesizes correctly""" cases = [ - (0, '0 B'), - (233, '233 B'), - (1023, '1023 B'), - (1024, '1 KB'), - (1024 ** 2 - 1, '1 MB'), - (12345678, '12 MB'), - (1024 ** 3 + 100, '1 GB'), - (1024 ** 4 + 1, '1 TB'), - (1024 ** 5 * 2, '2048 TB'), + (0, "0 B"), + (233, "233 B"), + (1023, "1023 B"), + (1024, "1 KB"), + (1024**2 - 1, "1 MB"), + (12345678, "12 MB"), + (1024**3 + 100, "1 GB"), + (1024**4 + 1, "1 TB"), + (1024**5 * 2, "2048 TB"), ] for v, expected in cases: @@ -28,36 +28,36 @@ def test_strip_s3_metadata(self): # Anonymised sample response from a head_object call with boto3 data = { - 'ResponseMetadata': { - 'RequestId': 'XXXXXXXXXXXXXXXX', - 'HostId': 'hhhhhhhhhhhhhhhhhhhhhhhhhh', - 'HTTPStatusCode': 200, - 'HTTPHeaders': { - 'x-amz-id-2': 'ababababababababaabababababab', - 'x-amz-request-id': 'XXXXXXXXXXXXXXXX', - 'date': 'Wed, 20 Oct 2021 00:00:00 GMT', - 'last-modified': 'Fri, 22 May 2021 00:00:00 GMT', - 'etag': '"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"', - 'accept-ranges': 'bytes', - 'content-type': 'application/json', - 'server': 'AmazonS3', - 'content-length': '13337' + "ResponseMetadata": { + "RequestId": "XXXXXXXXXXXXXXXX", + "HostId": "hhhhhhhhhhhhhhhhhhhhhhhhhh", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amz-id-2": "ababababababababaabababababab", + "x-amz-request-id": "XXXXXXXXXXXXXXXX", + "date": "Wed, 20 Oct 2021 00:00:00 GMT", + "last-modified": "Fri, 22 May 2021 00:00:00 GMT", + "etag": '"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"', + "accept-ranges": "bytes", + "content-type": "application/json", + "server": "AmazonS3", + "content-length": "13337", }, - 'RetryAttempts': 0 + "RetryAttempts": 0, }, - 'AcceptRanges': 'bytes', - 'LastModified': datetime.datetime(2021, 5, 22, 0, 0, 0), - 'ContentLength': 13409, - 'ETag': '"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"', - 'ContentType': 'application/x-tar', - 'Metadata': {} + "AcceptRanges": "bytes", + "LastModified": datetime.datetime(2021, 5, 22, 0, 0, 0), + "ContentLength": 13409, + "ETag": '"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"', + "ContentType": "application/x-tar", + "Metadata": {}, } expected = { - 'Content-Length': '13 KB (13337 bytes)', - 'Content-Type': 'application/json', - 'Last-Modified': 'Fri, 22 May 2021 00:00:00 GMT', - 'Metadata': {} + "Content-Length": "13 KB (13337 bytes)", + "Content-Type": "application/json", + "Last-Modified": "Fri, 22 May 2021 00:00:00 GMT", + "Metadata": {}, } actual = utils.strip_s3_metadata(data) diff --git a/s3_browser/tokeniser.py b/s3_browser/tokeniser.py index 18360c5..3e601f9 100644 --- a/s3_browser/tokeniser.py +++ b/s3_browser/tokeniser.py @@ -1,58 +1,57 @@ - def tokenise(s): """Breaks a string down into variable tokens""" acc = [] - curr = '' + curr = "" is_var = False is_braced = False is_escaped = False for c in s: if not is_var: - if c == '\\' and not is_escaped: + if c == "\\" and not is_escaped: is_escaped = True continue - if c == '$' and not is_escaped: + if c == "$" and not is_escaped: is_var = True - if curr != '': + if curr != "": acc.append(RawString(curr)) - curr = '' + curr = "" continue - if is_escaped and c != '$': - curr += '\\' + if is_escaped and c != "$": + curr += "\\" is_escaped = False curr += c continue - if c == '{' and curr == '': + if c == "{" and curr == "": is_braced = True continue if is_braced: - if c == '}': + if c == "}": acc.append(Token(curr)) is_var = False is_braced = False - curr = '' + curr = "" continue curr += c continue - if not str.isalnum(c) and c != '_': + if not str.isalnum(c) and c != "_": acc.append(Token(curr)) is_var = False is_braced = False - curr = '' + curr = "" - if c == '\\': + if c == "\\": is_escaped = True - elif c == '$': + elif c == "$": is_var = True else: curr = c @@ -62,9 +61,9 @@ def tokenise(s): # Handle final token if is_var and is_braced: - raise TokeniserException('Unclosed braced variable') + raise TokeniserException("Unclosed braced variable") - if curr != '': + if curr != "": acc.append(Token(curr) if is_var else RawString(curr)) return acc @@ -76,15 +75,15 @@ def render(tokens, context): :param tokens: A list of `Token` and `RawString`s as provided by `tokenise` """ - acc = '' + acc = "" for t in tokens: - v = getattr(t, 'value', None) + v = getattr(t, "value", None) if v: acc += v continue if t.name not in context: - raise TokeniserException('Unknown token \'{}\''.format(t.name)) + raise TokeniserException("Unknown token '{}'".format(t.name)) acc += context[t.name] diff --git a/s3_browser/utils.py b/s3_browser/utils.py index ea40201..e79d98a 100644 --- a/s3_browser/utils.py +++ b/s3_browser/utils.py @@ -1,12 +1,11 @@ import shutil - # TODO: This could probably be a couple of hundred content types _SAFE_CONTENT_TYPE_PREFIXES = [ - 'application/json', - 'application/xml', - 'application/yaml', - 'text/' + "application/json", + "application/xml", + "application/yaml", + "text/", ] @@ -52,7 +51,7 @@ def print_grid(data): for i in range(len(data) % num_cols): groups[i] += 1 - output = [''] * groups[0] + output = [""] * groups[0] i = 0 for g in groups: for j in range(g): @@ -65,16 +64,14 @@ def print_grid(data): def print_dict(data, indent_level=0): """Pretty-print a dict full of key-value metadata pairs""" - indent = ' ' * indent_level + indent = " " * indent_level def _format_key(k): - return '{}{}{: <40}{}'.format( - indent, '\x1b[36m', k + ':', '\x1b[0m' - ) + return "{}{}{: <40}{}".format(indent, "\x1b[36m", k + ":", "\x1b[0m") for k, v in sorted(data.items()): if not isinstance(v, dict): - print('{}{}'.format(_format_key(k), v)) + print("{}{}".format(_format_key(k), v)) else: print(_format_key(k)) print_dict(v, indent_level=indent_level + 1) @@ -88,9 +85,9 @@ def pretty_size(n): size = int(n) shortened = None - for suffix in ('B', 'KB', 'MB', 'GB', 'TB'): - if size <= 1023 or suffix == 'TB': - shortened = '{} {}'.format(round(size), suffix) + for suffix in ("B", "KB", "MB", "GB", "TB"): + if size <= 1023 or suffix == "TB": + shortened = "{} {}".format(round(size), suffix) break size /= 1024 @@ -100,19 +97,19 @@ def pretty_size(n): def strip_s3_metadata(data): """Strip s3 head_object metadata down to the useful stuff""" - metadata = data.get('Metadata', {}) - http_head = data.get('ResponseMetadata', {}).get('HTTPHeaders', {}) + metadata = data.get("Metadata", {}) + http_head = data.get("ResponseMetadata", {}).get("HTTPHeaders", {}) - content_length = int(http_head.get('content-length') or 0) + content_length = int(http_head.get("content-length") or 0) pretty_len = pretty_size(content_length) if pretty_len: - content_length = '{} ({} bytes)'.format(pretty_len, content_length) + content_length = "{} ({} bytes)".format(pretty_len, content_length) return { - 'Content-Length': content_length, - 'Content-Type': http_head.get('content-type'), - 'Last-Modified': http_head.get('last-modified'), - 'Metadata': metadata + "Content-Length": content_length, + "Content-Type": http_head.get("content-type"), + "Last-Modified": http_head.get("last-modified"), + "Metadata": metadata, } @@ -127,12 +124,12 @@ def print_object(obj): the encoding from the content-type header if provided. """ metadata = strip_s3_metadata(obj) - content_type = metadata.get('Content-Type') + content_type = metadata.get("Content-Type") if not _is_safe_content_type(content_type): raise ValueError( 'Refusing to print unsafe content type "{}"'.format(content_type) ) - with obj['Body'] as c: - print(c.read().decode('utf-8'), end='') + with obj["Body"] as c: + print(c.read().decode("utf-8"), end="") diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..7ce9d8d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,20 @@ +[metadata] +name = S3 Browser +summary = An interactive utility for browsing S3 like a regular directory tree +description_file = README.md +author = @giftig +license = MIT + +[flake8] +max-line-length = 80 +select = C,E,F,W,B,B950 +ignore = E203,E501,W503,W504 + +[isort] +combine_as_imports = true +include_trailing_comma = false +line_length = 88 +known_first_party = s3_browser +known_third_party = boto3,pytest +multi_line_output = 3 +order_by_type = false diff --git a/setup.py b/setup.py index bfb326f..ef656e4 100755 --- a/setup.py +++ b/setup.py @@ -1,48 +1,45 @@ #!/usr/bin/env python3 import os + from setuptools import setup def readme_content(): dir = os.path.abspath(os.path.dirname(__file__)) desc = None - with open(os.path.join(dir, 'README.md'), 'r') as f: + with open(os.path.join(dir, "README.md"), "r") as f: desc = f.read() return desc setup( - name='s3_browser', - version='0.3.6', - packages=['s3_browser'], - entry_points={ - 'console_scripts': [ - 's3-browser=s3_browser.cli:main' - ] - }, - install_requires=['boto3>=1.9.0', 'python-magic>=0.4.27'], - python_requires='>=3.2', + name="s3_browser", + version="0.3.6", + packages=["s3_browser"], + entry_points={"console_scripts": ["s3-browser=s3_browser.cli:main"]}, + install_requires=["boto3>=1.9.0", "python-magic>=0.4.27"], + python_requires=">=3.2", long_description=readme_content(), - long_description_content_type='text/markdown', - keywords='aws s3 browser cli interactive prompt s3-browser', - author='Rob Moore', - author_email='giftiger.wunsch@xantoria.com', - license='MIT', - url='https://github.com/giftig/s3-browser/', + long_description_content_type="text/markdown", + keywords="aws s3 browser cli interactive prompt s3-browser", + author="Rob Moore", + author_email="giftiger.wunsch@xantoria.com", + license="MIT", + url="https://github.com/giftig/s3-browser/", classifiers=[ - 'Development Status :: 4 - Beta', - 'Environment :: Console', - 'Intended Audience :: Developers', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: MIT License', - 'Natural Language :: English', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: Implementation :: PyPy', - 'Topic :: Internet', - 'Topic :: Terminals', - 'Topic :: Utilities' - ] + "Development Status :: 4 - Beta", + "Environment :: Console", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: MIT License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: Implementation :: PyPy", + "Topic :: Internet", + "Topic :: Terminals", + "Topic :: Utilities", + ], )