Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: scrub logs #1802

Merged
merged 33 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
afca3e2
feat: add delete to s3 wrapper
mSounak Sep 8, 2023
5de92ef
datastore changes to support deleting files
saikonen Apr 16, 2024
0888d35
implement delete logs
saikonen Apr 16, 2024
a3497fc
add delete mode to datastore which inits _attempt
saikonen Apr 16, 2024
2543d07
cleanup debug print
saikonen Apr 19, 2024
9c8a3e8
change logs CLI command to be a cmd group, with a default cmd for bac…
saikonen Apr 19, 2024
c08e6ee
fix logger handling --quiet
saikonen Apr 22, 2024
54c6a7d
fix args parsing for custom click group
saikonen Apr 23, 2024
8dbdb32
another args fix
saikonen Apr 23, 2024
9b20c01
reword delete to scrub. Only delete log files that exist, so as to no…
saikonen Apr 24, 2024
7301df8
add completion msg to log scrubbing
saikonen Apr 24, 2024
e02135d
fix help msg wording
saikonen Apr 25, 2024
11c761f
overwrite logs instead of deleting them
saikonen Apr 25, 2024
94a8220
cleanup unnecessary delete functionality from datastore.
saikonen Apr 25, 2024
a51f222
make it possible to specify a task attempt number to scrub logs for.
saikonen Apr 25, 2024
77fe766
allow specifying --attempt for show logs as well
saikonen Apr 25, 2024
40dd2c7
enable scrubbing of logs without task_id in order to target multiple …
saikonen Apr 25, 2024
c21f2a7
handle attempts in scrubbing better.
saikonen Apr 25, 2024
1ae1262
Merge branch 'master' into feature/delete-logs
saikonen Apr 29, 2024
5dccca0
address logs_cli nits.
saikonen Apr 30, 2024
7da124b
make log scrubbing more robust, do not halt on individual errors, and…
saikonen Apr 30, 2024
f6dbc1f
fix type hint in flowdatastore
saikonen Apr 30, 2024
b541e4f
record metadata after successful scrubbing
saikonen Apr 30, 2024
9828ab2
Revert "record metadata after successful scrubbing"
saikonen Apr 30, 2024
913088e
add flow datastore method for fetching taskdatastores for all attempt…
saikonen May 1, 2024
cddd188
add support for --all flag when scrubbing logs of all task attempts
saikonen May 1, 2024
a05d4cd
add support for specifying to include not-done task attempts in scrub…
saikonen May 1, 2024
e9c988b
refactor: remove new flow_datastore method and combine functionality …
saikonen May 3, 2024
fe5bc32
reword get_latest_task_datastores docstring
saikonen May 7, 2024
5e6245a
rename to get_task_datastores
saikonen May 7, 2024
d1edc5b
limit attempt checks by MAX_ATTEMPTS
saikonen May 7, 2024
470fbed
fix attempt iteration
saikonen May 7, 2024
6e5dcd0
fix attempt_range logic
saikonen May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 0 additions & 120 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,126 +287,6 @@ def dump(obj, input_path, private=None, max_value_size=None, include=None, file=
echo("Artifacts written to *%s*" % file)


@cli.command(
help="Show stdout/stderr produced by a task or all tasks in a step. "
"The format for input-path is either <run_id>/<step_name> or "
"<run_id>/<step_name>/<task_id>."
)
@click.argument("input-path")
@click.option(
"--stdout/--no-stdout",
default=False,
show_default=True,
help="Show stdout of the task.",
)
@click.option(
"--stderr/--no-stderr",
default=False,
show_default=True,
help="Show stderr of the task.",
)
@click.option(
"--both/--no-both",
default=True,
show_default=True,
help="Show both stdout and stderr of the task.",
)
@click.option(
"--timestamps/--no-timestamps",
default=False,
show_default=True,
help="Show timestamps.",
)
@click.pass_obj
def logs(obj, input_path, stdout=None, stderr=None, both=None, timestamps=False):
types = set()
if stdout:
types.add("stdout")
both = False
if stderr:
types.add("stderr")
both = False
if both:
types.update(("stdout", "stderr"))

streams = list(sorted(types, reverse=True))

# Pathspec can either be run_id/step_name or run_id/step_name/task_id.
parts = input_path.split("/")
if len(parts) == 2:
run_id, step_name = parts
task_id = None
elif len(parts) == 3:
run_id, step_name, task_id = parts
else:
raise CommandException(
"input_path should either be run_id/step_name "
"or run_id/step_name/task_id"
)

datastore_set = TaskDataStoreSet(
obj.flow_datastore, run_id, steps=[step_name], allow_not_done=True
)
if task_id:
ds_list = [
TaskDataStore(
obj.flow_datastore,
run_id=run_id,
step_name=step_name,
task_id=task_id,
mode="r",
allow_not_done=True,
)
]
else:
ds_list = list(datastore_set) # get all tasks

if ds_list:

def echo_unicode(line, **kwargs):
click.secho(line.decode("UTF-8", errors="replace"), **kwargs)

# old style logs are non mflog-style logs
maybe_old_style = True
for ds in ds_list:
echo(
"Dumping logs of run_id=*{run_id}* "
"step=*{step}* task_id=*{task_id}*".format(
run_id=ds.run_id, step=ds.step_name, task_id=ds.task_id
),
fg="magenta",
)

for stream in streams:
echo(stream, bold=True)
logs = ds.load_logs(LOG_SOURCES, stream)
if any(data for _, data in logs):
# attempt to read new, mflog-style logs
for line in mflog.merge_logs([blob for _, blob in logs]):
if timestamps:
ts = mflog.utc_to_local(line.utc_tstamp)
tstamp = ts.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
click.secho(tstamp + " ", fg=LOGGER_TIMESTAMP, nl=False)
echo_unicode(line.msg)
maybe_old_style = False
elif maybe_old_style:
# if they are not available, we may be looking at
# a legacy run (unless we have seen new-style data already
# for another stream). This return an empty string if
# nothing is found
log = ds.load_log_legacy(stream)
if log and timestamps:
raise CommandException(
"We can't show --timestamps for old runs. Sorry!"
)
echo_unicode(log, nl=False)
else:
raise CommandException(
"No Tasks found at the given path -- "
"either none exist or none have started yet"
)


# TODO - move step and init under a separate 'internal' subcommand


Expand Down
130 changes: 128 additions & 2 deletions metaflow/datastore/flow_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def datastore_root(self):
return self._storage_impl.datastore_root

def get_latest_task_datastores(
saikonen marked this conversation as resolved.
Show resolved Hide resolved
self, run_id=None, steps=None, pathspecs=None, allow_not_done=False
self, run_id=None, steps=None, pathspecs=None, allow_not_done=False, mode="r"
):
"""
Return a list of TaskDataStore for a subset of the tasks.
Expand All @@ -93,6 +93,8 @@ def get_latest_task_datastores(
allow_not_done : bool, optional
If True, returns the latest attempt of a task even if that attempt
wasn't marked as done, by default False
mode : str, default "r"
Mode to initialize the returned TaskDataStores in.

Returns
-------
Expand Down Expand Up @@ -172,11 +174,135 @@ def get_latest_task_datastores(
else:
latest_to_fetch = latest_started_attempts & done_attempts
latest_to_fetch = [
(v[0], v[1], v[2], v[3], data_objs.get(v), "r", allow_not_done)
(v[0], v[1], v[2], v[3], data_objs.get(v), mode, allow_not_done)
for v in latest_to_fetch
]
return list(itertools.starmap(self.get_task_datastore, latest_to_fetch))

def get_task_datastores(
self,
run_id=None,
steps=None,
pathspecs=None,
allow_not_done=False,
attempt=None,
mode="r",
):
"""
Return a list of TaskDataStore for a subset of the tasks.

We filter the list based on `steps` if non-None.
Alternatively, `pathspecs` can contain the exact list of pathspec(s)
(run_id/step_name/task_id) that should be filtered.
Note: When `pathspecs` is specified, we expect strict consistency and
not eventual consistency in contrast to other modes.

Parameters
----------
run_id : str, optional
Run ID to get the tasks from. If not specified, use pathspecs,
by default None
steps : List[str] , optional
Steps to get the tasks from. If run_id is specified, this
must also be specified, by default None
pathspecs : List[str], optional
Full task specs (run_id/step_name/task_id). Can be used instead of
specifying run_id and steps, by default None
allow_not_done : bool, optional
If True, returns the latest or specified attempt of a task even if that attempt
wasn't marked as done, by default False
attempt: int, optional
Attempt number of the tasks to return
mode : str, default "r"
Mode to initialize the returned TaskDataStores in.

Returns
-------
List[TaskDataStore]
Task datastores for all the attempts of the tasks specified.
"""
task_urls = []
# Note: When `pathspecs` is specified, we avoid the potentially
# eventually consistent `list_content` operation, and directly construct
# the task_urls list.
if pathspecs:
task_urls = [
self._storage_impl.path_join(self.flow_name, pathspec)
for pathspec in pathspecs
]
else:
run_prefix = self._storage_impl.path_join(self.flow_name, run_id)
if steps:
step_urls = [
self._storage_impl.path_join(run_prefix, step) for step in steps
]
else:
step_urls = [
step.path
for step in self._storage_impl.list_content([run_prefix])
if step.is_file is False
]
task_urls = [
task.path
for task in self._storage_impl.list_content(step_urls)
if task.is_file is False
]
urls = []
for task_url in task_urls:
# parse content urls for specific attempt only, or for all attempts in max range
attempt_range = (
[attempt]
if attempt is not None
else range(metaflow_config.MAX_ATTEMPTS)
)
for attempt in attempt_range:
for suffix in [
TaskDataStore.METADATA_DATA_SUFFIX,
TaskDataStore.METADATA_ATTEMPT_SUFFIX,
TaskDataStore.METADATA_DONE_SUFFIX,
]:
urls.append(
self._storage_impl.path_join(
task_url,
TaskDataStore.metadata_name_for_attempt(suffix, attempt),
)
)
latest_started_attempts = {}
done_attempts = set()
data_objs = {}
with self._storage_impl.load_bytes(urls) as get_results:
for key, path, meta in get_results:
if path is not None:
_, run, step, task, fname = self._storage_impl.path_split(key)
attempt, fname = TaskDataStore.parse_attempt_metadata(fname)
attempt = int(attempt)
if fname == TaskDataStore.METADATA_DONE_SUFFIX:
done_attempts.add((run, step, task, attempt))
elif fname == TaskDataStore.METADATA_ATTEMPT_SUFFIX:
latest_started_attempts[(run, step, task)] = max(
latest_started_attempts.get((run, step, task), 0), attempt
)
elif fname == TaskDataStore.METADATA_DATA_SUFFIX:
# This somewhat breaks the abstraction since we are using
# load_bytes directly instead of load_metadata
with open(path, encoding="utf-8") as f:
data_objs[(run, step, task, attempt)] = json.load(f)

latest_started_attempts = set(
(run, step, task, attempt)
for (run, step, task), attempt in latest_started_attempts.items()
)
if allow_not_done:
to_fetch = done_attempts.union(latest_started_attempts)
else:
to_fetch = done_attempts

to_fetch = [
(v[0], v[1], v[2], v[3], data_objs.get(v), mode, allow_not_done)
for v in to_fetch
]
return list(itertools.starmap(self.get_task_datastore, to_fetch))

def get_task_datastore(
self,
run_id,
Expand Down
50 changes: 50 additions & 0 deletions metaflow/datastore/task_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,26 @@ def __init__(
if data_obj is not None:
self._objects = data_obj.get("objects", {})
self._info = data_obj.get("info", {})
elif self._mode == "d":
self._objects = {}
self._info = {}

if self._attempt is None:
for i in range(metaflow_config.MAX_ATTEMPTS):
check_meta = self._metadata_name_for_attempt(
self.METADATA_ATTEMPT_SUFFIX, i
)
if self.has_metadata(check_meta, add_attempt=False):
self._attempt = i

# Do not allow destructive operations on the datastore if attempt is still in flight
# and we explicitly did not allow operating on running tasks.
if not allow_not_done and not self.has_metadata(self.METADATA_DONE_SUFFIX):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a case where we want to delete logs for running tasks? It seems in delete mode we may want to ingore allow_not_done?

Copy link
Collaborator Author

@saikonen saikonen Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this in mainly in order to keep the datastore side more generic (allowing delete for in-flight attempts as well, though no use case exists yet). It is definitely not required for this feature, so I could remove it

raise DataException(
"No completed attempts of the task was found for task '%s'"
% self._path
)

else:
raise DataException("Unknown datastore mode: '%s'" % self._mode)

Expand Down Expand Up @@ -750,6 +770,36 @@ def save_logs(self, logsource, stream_data):
to_store_dict[n] = data
self._save_file(to_store_dict)

@require_mode("d")
def scrub_logs(self, logsources, stream, attempt_override=None):
path_logsources = {
self._metadata_name_for_attempt(
self._get_log_location(s, stream),
attempt_override=attempt_override,
): s
for s in logsources
}

# Legacy log paths
legacy_log = self._metadata_name_for_attempt(
"%s.log" % stream, attempt_override
)
path_logsources[legacy_log] = stream

existing_paths = [
path
for path in path_logsources.keys()
if self.has_metadata(path, add_attempt=False)
]

# Replace log contents with [REDACTED source stream]
to_store_dict = {
path: bytes("[REDACTED %s %s]" % (path_logsources[path], stream), "utf-8")
for path in existing_paths
}

self._save_file(to_store_dict, add_attempt=False, allow_overwrite=True)

@require_mode("r")
def load_log_legacy(self, stream, attempt_override=None):
"""
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
("argo-workflows", ".argo.argo_workflows_cli.cli"),
("card", ".cards.card_cli.cli"),
("tag", ".tag_cli.cli"),
("logs", ".logs_cli.cli"),
]

from .test_unbounded_foreach_decorator import InternalTestUnboundedForeachInput
Expand Down