Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: scrub logs #1802

Merged
merged 33 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
afca3e2
feat: add delete to s3 wrapper
mSounak Sep 8, 2023
5de92ef
datastore changes to support deleting files
saikonen Apr 16, 2024
0888d35
implement delete logs
saikonen Apr 16, 2024
a3497fc
add delete mode to datastore which inits _attempt
saikonen Apr 16, 2024
2543d07
cleanup debug print
saikonen Apr 19, 2024
9c8a3e8
change logs CLI command to be a cmd group, with a default cmd for bac…
saikonen Apr 19, 2024
c08e6ee
fix logger handling --quiet
saikonen Apr 22, 2024
54c6a7d
fix args parsing for custom click group
saikonen Apr 23, 2024
8dbdb32
another args fix
saikonen Apr 23, 2024
9b20c01
reword delete to scrub. Only delete log files that exist, so as to no…
saikonen Apr 24, 2024
7301df8
add completion msg to log scrubbing
saikonen Apr 24, 2024
e02135d
fix help msg wording
saikonen Apr 25, 2024
11c761f
overwrite logs instead of deleting them
saikonen Apr 25, 2024
94a8220
cleanup unnecessary delete functionality from datastore.
saikonen Apr 25, 2024
a51f222
make it possible to specify a task attempt number to scrub logs for.
saikonen Apr 25, 2024
77fe766
allow specifying --attempt for show logs as well
saikonen Apr 25, 2024
40dd2c7
enable scrubbing of logs without task_id in order to target multiple …
saikonen Apr 25, 2024
c21f2a7
handle attempts in scrubbing better.
saikonen Apr 25, 2024
1ae1262
Merge branch 'master' into feature/delete-logs
saikonen Apr 29, 2024
5dccca0
address logs_cli nits.
saikonen Apr 30, 2024
7da124b
make log scrubbing more robust, do not halt on individual errors, and…
saikonen Apr 30, 2024
f6dbc1f
fix type hint in flowdatastore
saikonen Apr 30, 2024
b541e4f
record metadata after successful scrubbing
saikonen Apr 30, 2024
9828ab2
Revert "record metadata after successful scrubbing"
saikonen Apr 30, 2024
913088e
add flow datastore method for fetching taskdatastores for all attempt…
saikonen May 1, 2024
cddd188
add support for --all flag when scrubbing logs of all task attempts
saikonen May 1, 2024
a05d4cd
add support for specifying to include not-done task attempts in scrub…
saikonen May 1, 2024
e9c988b
refactor: remove new flow_datastore method and combine functionality …
saikonen May 3, 2024
fe5bc32
reword get_latest_task_datastores docstring
saikonen May 7, 2024
5e6245a
rename to get_task_datastores
saikonen May 7, 2024
d1edc5b
limit attempt checks by MAX_ATTEMPTS
saikonen May 7, 2024
470fbed
fix attempt iteration
saikonen May 7, 2024
6e5dcd0
fix attempt_range logic
saikonen May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 0 additions & 120 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,126 +287,6 @@ def dump(obj, input_path, private=None, max_value_size=None, include=None, file=
echo("Artifacts written to *%s*" % file)


@cli.command(
help="Show stdout/stderr produced by a task or all tasks in a step. "
"The format for input-path is either <run_id>/<step_name> or "
"<run_id>/<step_name>/<task_id>."
)
@click.argument("input-path")
@click.option(
"--stdout/--no-stdout",
default=False,
show_default=True,
help="Show stdout of the task.",
)
@click.option(
"--stderr/--no-stderr",
default=False,
show_default=True,
help="Show stderr of the task.",
)
@click.option(
"--both/--no-both",
default=True,
show_default=True,
help="Show both stdout and stderr of the task.",
)
@click.option(
"--timestamps/--no-timestamps",
default=False,
show_default=True,
help="Show timestamps.",
)
@click.pass_obj
def logs(obj, input_path, stdout=None, stderr=None, both=None, timestamps=False):
types = set()
if stdout:
types.add("stdout")
both = False
if stderr:
types.add("stderr")
both = False
if both:
types.update(("stdout", "stderr"))

streams = list(sorted(types, reverse=True))

# Pathspec can either be run_id/step_name or run_id/step_name/task_id.
parts = input_path.split("/")
if len(parts) == 2:
run_id, step_name = parts
task_id = None
elif len(parts) == 3:
run_id, step_name, task_id = parts
else:
raise CommandException(
"input_path should either be run_id/step_name "
"or run_id/step_name/task_id"
)

datastore_set = TaskDataStoreSet(
obj.flow_datastore, run_id, steps=[step_name], allow_not_done=True
)
if task_id:
ds_list = [
TaskDataStore(
obj.flow_datastore,
run_id=run_id,
step_name=step_name,
task_id=task_id,
mode="r",
allow_not_done=True,
)
]
else:
ds_list = list(datastore_set) # get all tasks

if ds_list:

def echo_unicode(line, **kwargs):
click.secho(line.decode("UTF-8", errors="replace"), **kwargs)

# old style logs are non mflog-style logs
maybe_old_style = True
for ds in ds_list:
echo(
"Dumping logs of run_id=*{run_id}* "
"step=*{step}* task_id=*{task_id}*".format(
run_id=ds.run_id, step=ds.step_name, task_id=ds.task_id
),
fg="magenta",
)

for stream in streams:
echo(stream, bold=True)
logs = ds.load_logs(LOG_SOURCES, stream)
if any(data for _, data in logs):
# attempt to read new, mflog-style logs
for line in mflog.merge_logs([blob for _, blob in logs]):
if timestamps:
ts = mflog.utc_to_local(line.utc_tstamp)
tstamp = ts.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
click.secho(tstamp + " ", fg=LOGGER_TIMESTAMP, nl=False)
echo_unicode(line.msg)
maybe_old_style = False
elif maybe_old_style:
# if they are not available, we may be looking at
# a legacy run (unless we have seen new-style data already
# for another stream). This return an empty string if
# nothing is found
log = ds.load_log_legacy(stream)
if log and timestamps:
raise CommandException(
"We can't show --timestamps for old runs. Sorry!"
)
echo_unicode(log, nl=False)
else:
raise CommandException(
"No Tasks found at the given path -- "
"either none exist or none have started yet"
)


# TODO - move step and init under a separate 'internal' subcommand


Expand Down
16 changes: 16 additions & 0 deletions metaflow/datastore/datastore_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,19 @@ def load_bytes(self, keys):
duplicate keys.
"""
raise NotImplementedError

def delete_bytes(self, keys):
"""
Deletes objects from the datastore

Parameters
----------
keys : List[String]
Keys to delete

Returns
-------
List[str]
Keys that were deleted
"""
raise NotImplementedError
57 changes: 57 additions & 0 deletions metaflow/datastore/task_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ def __init__(
if data_obj is not None:
self._objects = data_obj.get("objects", {})
self._info = data_obj.get("info", {})
elif self._mode == "d":
self._objects = {}
self._info = {}

if self._attempt is None:
for i in range(metaflow_config.MAX_ATTEMPTS):
check_meta = self._metadata_name_for_attempt(
self.METADATA_ATTEMPT_SUFFIX, i
)
if self.has_metadata(check_meta, add_attempt=False):
self._attempt = i

else:
raise DataException("Unknown datastore mode: '%s'" % self._mode)

Expand Down Expand Up @@ -750,6 +762,33 @@ def save_logs(self, logsource, stream_data):
to_store_dict[n] = data
self._save_file(to_store_dict)

@require_mode("d")
def scrub_logs(self, logsources, stream, attempt_override=None):
path_logsources = {
self._metadata_name_for_attempt(
self._get_log_location(s, stream),
attempt_override=attempt_override,
): s
for s in logsources
}

# Legacy log paths
legacy_log = self._metadata_name_for_attempt(
"%s.log" % stream, attempt_override
)
path_logsources[legacy_log] = stream

paths = path_logsources.keys()

deleted_paths = self._delete_file(paths, add_attempt=False)
saikonen marked this conversation as resolved.
Show resolved Hide resolved
# Replace log contents with [REDACTED source stream]
to_store_dict = {
path: bytes("[REDACTED %s %s]" % (path_logsources[path], stream), "utf-8")
for path in deleted_paths
}

self._save_file(to_store_dict, add_attempt=False)

@require_mode("r")
def load_log_legacy(self, stream, attempt_override=None):
"""
Expand Down Expand Up @@ -934,3 +973,21 @@ def _load_file(self, names, add_attempt=True):
with open(path, "rb") as f:
results[name] = f.read()
return results

def _delete_file(self, names, add_attempt=True):
"""
Deletes files from the TaskDataStore directory.
"""
to_delete = []
for name in names:
if add_attempt:
path = self._storage_impl.path_join(
self._path, self._metadata_name_for_attempt(name)
)
else:
path = self._storage_impl.path_join(self._path, name)
to_delete.append(path)

deleted_paths = self._storage_impl.delete_bytes(to_delete)

return [self._storage_impl.basename(path) for path in deleted_paths]
1 change: 1 addition & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
("argo-workflows", ".argo.argo_workflows_cli.cli"),
("card", ".cards.card_cli.cli"),
("tag", ".tag_cli.cli"),
("logs", ".logs_cli.cli"),
]

from .test_unbounded_foreach_decorator import InternalTestUnboundedForeachInput
Expand Down
12 changes: 12 additions & 0 deletions metaflow/plugins/datastores/s3_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,15 @@ def iter_results():
yield r.key, None, None

return CloseAfterUse(iter_results(), closer=s3)

def delete_bytes(self, paths):
if len(paths) == 0:
return []

s3 = S3(
s3root=self.datastore_root,
tmproot=ARTIFACT_LOCALROOT,
external_client=self.s3_client,
)

return [path for path in paths if s3.delete(path)]
32 changes: 32 additions & 0 deletions metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,38 @@ def _check():

return self._put_many_files(_check(), overwrite)

def delete(self, key: str):
"""
Delete a single object in S3.
Parameters
----------
key : str
Object to delete. It can be an S3 url or a path suffix.

Returns
-------
Boolean
Success/Failure of deleting the key. Failure can be due to key not existing.
"""

url = self._url(key)
src = urlparse(url)

def _delete(s3, tmp):
if not src.path.lstrip("/"):
raise TypeError("'None' type object cannot be deleted.")
else:
return s3.delete_object(Bucket=src.netloc, Key=src.path.lstrip("/"))

# Unfortunately AWS always returns 204 even when deleting non-existent keys, so we need to perform a check before the delete op
# in order to be able to provide info on deletion success/failure
obj = self.info(key, return_missing=True)
if not obj.exists:
return False

self._one_boto_op(_delete, url, create_tmp_file=False)
return True

def _one_boto_op(self, op, url, create_tmp_file=True):
error = ""
for i in range(S3_RETRY_COUNT + 1):
Expand Down